aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp50
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp11
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.h12
-rw-r--r--vespalib/src/vespa/vespalib/util/thread_bundle.cpp6
-rw-r--r--vespalib/src/vespa/vespalib/util/thread_bundle.h35
5 files changed, 99 insertions, 15 deletions
diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
index debb34724f6..d6e0e473e59 100644
--- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
+++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
@@ -201,4 +201,54 @@ TEST_MT_FF("require that bundle pool works with multiple threads", 32, SimpleThr
f1.release(std::move(bundle));
}
+struct Filler {
+ int stuff;
+ Filler() : stuff(0) {}
+ virtual ~Filler() {}
+};
+
+struct Proxy : Filler, Runnable {
+ Runnable ⌖
+ Proxy(Runnable &target_in) : target(target_in) {}
+ void run() override { target.run(); }
+};
+
+struct AlmostRunnable : Runnable {};
+
+TEST("require that Proxy needs fixup to become Runnable") {
+ Cnt cnt;
+ Proxy proxy(cnt);
+ Runnable &runnable = proxy;
+ void *proxy_ptr = &proxy;
+ void *runnable_ptr = &runnable;
+ EXPECT_TRUE(proxy_ptr != runnable_ptr);
+}
+
+TEST_FF("require that various versions of run can be used to invoke targets", SimpleThreadBundle(5), State(5)) {
+ EXPECT_TRUE(ThreadBundle::is_runnable_ptr<Runnable*>());
+ EXPECT_TRUE(ThreadBundle::is_runnable_ptr<Runnable::UP>());
+ EXPECT_FALSE(ThreadBundle::is_runnable_ptr<std::unique_ptr<Proxy>>());
+ EXPECT_FALSE(ThreadBundle::is_runnable_ptr<std::unique_ptr<AlmostRunnable>>());
+ std::vector<Runnable::UP> direct;
+ std::vector<std::unique_ptr<Proxy>> custom;
+ for (Runnable &target: f2.cnts) {
+ direct.push_back(std::make_unique<Proxy>(target));
+ custom.push_back(std::make_unique<Proxy>(target));
+ }
+ std::vector<Runnable*> refs = f2.getTargets(5);
+ f2.check({0,0,0,0,0});
+ f1.run(refs.data(), 3);
+ f2.check({1,1,1,0,0});
+ f1.run(&refs[3], 2);
+ f2.check({1,1,1,1,1});
+ f1.run(refs);
+ f2.check({2,2,2,2,2});
+ f1.run(direct);
+ f2.check({3,3,3,3,3});
+ f1.run(custom);
+ f2.check({4,4,4,4,4});
+ f1.run(f2.cnts);
+ f2.check({5,5,5,5,5});
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index 99ab298864f..8a66a4f6898 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -157,20 +157,21 @@ SimpleThreadBundle::size() const
}
void
-SimpleThreadBundle::run(const std::vector<Runnable*> &targets)
+SimpleThreadBundle::run(Runnable* const* targets, size_t cnt)
{
- if (targets.size() > size()) {
+ if (cnt > size()) {
throw IllegalArgumentException("too many targets");
}
- if (targets.empty()) {
+ if (cnt == 0) {
return;
}
- if (targets.size() == 1) {
+ if (cnt == 1) {
targets[0]->run();
return;
}
CountDownLatch latch(size());
- _work.targets = &targets;
+ _work.targets = targets;
+ _work.cnt = cnt;
_work.latch = &latch;
_hook->run();
latch.await();
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
index b7434d09ac3..569500635a5 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
@@ -18,9 +18,10 @@ namespace fixed_thread_bundle {
* support static wiring of signal paths and execution hooks.
**/
struct Work {
- const std::vector<Runnable *> *targets;
+ Runnable* const* targets;
+ size_t cnt;
CountDownLatch *latch;
- Work() : targets(0), latch(0) {}
+ Work() : targets(nullptr), cnt(0), latch(0) {}
};
/**
@@ -30,10 +31,10 @@ struct Part {
const Work &work;
size_t offset;
Part(const Work &w, size_t o) : work(w), offset(o) {}
- bool valid() { return (offset < work.targets->size()); }
+ bool valid() { return (offset < work.cnt); }
void perform() {
if (valid()) {
- (*(work.targets))[offset]->run();
+ work.targets[offset]->run();
}
work.latch->countDown();
}
@@ -130,7 +131,8 @@ public:
: SimpleThreadBundle(size, Runnable::default_init_function, strategy) {}
~SimpleThreadBundle();
size_t size() const override;
- void run(const std::vector<Runnable*> &targets) override;
+ using ThreadBundle::run;
+ void run(Runnable* const* targets, size_t cnt) override;
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/thread_bundle.cpp
index 9f953abfce7..d47ffb2f3ef 100644
--- a/vespalib/src/vespa/vespalib/util/thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/thread_bundle.cpp
@@ -9,10 +9,10 @@ ThreadBundle &
ThreadBundle::trivial() {
struct TrivialThreadBundle : ThreadBundle {
size_t size() const override { return 1; }
- void run(const std::vector<Runnable*> &targets) override {
- if (targets.size() == 1) {
+ void run(Runnable* const* targets, size_t cnt) override {
+ if (cnt == 1) {
targets[0]->run();
- } else if (targets.size() > 1) {
+ } else if (cnt > 1) {
throw IllegalArgumentException("too many targets");
}
};
diff --git a/vespalib/src/vespa/vespalib/util/thread_bundle.h b/vespalib/src/vespa/vespalib/util/thread_bundle.h
index 699fd8e27a0..830d7c76e7c 100644
--- a/vespalib/src/vespa/vespalib/util/thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/thread_bundle.h
@@ -27,7 +27,34 @@ struct ThreadBundle {
* their completion. This function cannot be called with more
* targets than the size of this bundle.
**/
- virtual void run(const std::vector<Runnable*> &targets) = 0;
+ virtual void run(Runnable* const* targets, size_t cnt) = 0;
+
+ // convenience run wrapper
+ void run(const std::vector<Runnable*> &targets) {
+ run(targets.data(), targets.size());
+ }
+
+ // convenience run wrapper
+ void run(const std::vector<Runnable::UP> &targets) {
+ static_assert(sizeof(Runnable::UP) == sizeof(Runnable*));
+ run(reinterpret_cast<Runnable* const*>(targets.data()), targets.size());
+ }
+
+ template <typename T>
+ static constexpr bool is_runnable_ptr() {
+ return (std::is_same_v<T,Runnable*> || std::is_same_v<T,Runnable::UP>);
+ }
+
+ // convenience run wrapper
+ template <typename Item>
+ std::enable_if_t<!is_runnable_ptr<Item>(),void> run(std::vector<Item> &items) {
+ std::vector<Runnable*> targets;
+ targets.reserve(items.size());
+ for (auto &item: items) {
+ targets.push_back(resolve(item));
+ }
+ run(targets);
+ }
/**
* Empty virtual destructor to enable subclassing.
@@ -36,7 +63,11 @@ struct ThreadBundle {
// a thread bundle that can only run things in the current thread.
static ThreadBundle &trivial();
+
+private:
+ Runnable *resolve(Runnable &target) { return &target; }
+ template <typename T>
+ Runnable *resolve(const std::unique_ptr<T> &target) { return target.get(); }
};
} // namespace vespalib
-