diff options
author | Håvard Pettersen <havardpe@oath.com> | 2020-10-30 10:06:37 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2020-10-30 10:06:37 +0000 |
commit | 2e686a231d377198331d4a6c9971b32a83a22bbc (patch) | |
tree | a7f25af1bc267dd4b39cae66a3505734bd9e58b2 /vespalib | |
parent | 9a6ee1f2287a679f31cb3706953dede231f13bb3 (diff) |
enable explicitly specifying the participation id
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/rendezvous/rendezvous_test.cpp | 193 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/rendezvous.h | 31 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/rendezvous.hpp | 93 |
3 files changed, 247 insertions, 70 deletions
diff --git a/vespalib/src/tests/rendezvous/rendezvous_test.cpp b/vespalib/src/tests/rendezvous/rendezvous_test.cpp index bce25692760..f4ec7870ad5 100644 --- a/vespalib/src/tests/rendezvous/rendezvous_test.cpp +++ b/vespalib/src/tests/rendezvous/rendezvous_test.cpp @@ -1,7 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/vespalib/util/rendezvous.h> +#include <vespa/vespalib/util/time.h> #include <utility> +#include <thread> using namespace vespalib; @@ -10,28 +12,45 @@ struct Value { Value() : value(42) {} }; -template <typename T> -struct Empty : Rendezvous<int, T> { - Empty(size_t n) : Rendezvous<int, T>(n) {} +template <typename T, bool ext_id> +struct Empty : Rendezvous<int, T, ext_id> { + Empty(size_t n) : Rendezvous<int, T, ext_id>(n) {} void mingle() override {} - T meet() { return this->rendezvous(0); } + T meet(size_t thread_id) { + if constexpr (ext_id) { + return this->rendezvous(0, thread_id); + } else { + (void) thread_id; + return this->rendezvous(0); + } + } }; -struct Add : Rendezvous<size_t, std::pair<size_t, size_t> > { - Add(size_t n) : Rendezvous<size_t, std::pair<size_t, size_t> >(n) {} +template <bool ext_id> +struct Add : Rendezvous<size_t, std::pair<size_t, size_t>, ext_id> { + using Super = Rendezvous<size_t, std::pair<size_t, size_t>, ext_id>; + using Super::size; + using Super::in; + using Super::out; + Add(size_t n) : Super(n) {} void mingle() override { size_t sum = 0; for (size_t i = 0; i < size(); ++i) { sum += in(i); } - for (size_t i = 0; i < size(); ++i) { + for (size_t i = 0; i < this->size(); ++i) { out(i) = std::make_pair(sum, in(0)); } } }; -struct Modify : Rendezvous<size_t, size_t> { - Modify(size_t n) : Rendezvous<size_t, size_t>(n) {} +template <bool ext_id> +struct Modify : Rendezvous<size_t, size_t, ext_id> { + using Super = Rendezvous<size_t, size_t, ext_id>; + using Super::size; + using Super::in; + using Super::out; + Modify(size_t n) : Super(n) {} void mingle() override { for (size_t i = 0; i < size(); ++i) { in(i) += 1; @@ -42,60 +61,122 @@ struct Modify : Rendezvous<size_t, size_t> { } }; -template <typename T> -struct Swap : Rendezvous<T, T> { - using Rendezvous<T, T>::in; - using Rendezvous<T, T>::out; - Swap() : Rendezvous<T, T>(2) {} +template <typename T, bool ext_id> +struct Swap : Rendezvous<T, T, ext_id> { + using Super = Rendezvous<T, T, ext_id>; + using Super::size; + using Super::in; + using Super::out; + Swap() : Super(2) {} void mingle() override { out(0) = std::move(in(1)); out(1) = std::move(in(0)); } }; +template <bool ext_id> +struct DetectId : Rendezvous<int, size_t, ext_id> { + using Super = Rendezvous<int, size_t, ext_id>; + using Super::size; + using Super::in; + using Super::out; + DetectId(size_t n) : Super(n) {} + void mingle() override { + for (size_t i = 0; i < size(); ++i) { + out(i) = i; + } + } + size_t meet(size_t thread_id) { + if constexpr (ext_id) { + return this->rendezvous(0, thread_id); + } else { + (void) thread_id; + return this->rendezvous(0); + } + } +}; + +struct Any : Rendezvous<bool, bool> { + Any(size_t n) : Rendezvous<bool, bool>(n) {} + void mingle() override { + bool result = false; + for (size_t i = 0; i < size(); ++i) { + result |= in(i); + } + for (size_t i = 0; i < size(); ++i) { + out(i) = result; + } + } + bool check(bool flag) { return this->rendezvous(flag); } +}; + TEST("require that creating an empty rendezvous will fail") { - EXPECT_EXCEPTION(Add(0), IllegalArgumentException, ""); + EXPECT_EXCEPTION(Add<false>(0), IllegalArgumentException, ""); + EXPECT_EXCEPTION(Add<true>(0), IllegalArgumentException, ""); } -TEST_F("require that a single thread can mingle with itself within a rendezvous", Add(1)) { +TEST_FF("require that a single thread can mingle with itself within a rendezvous", Add<false>(1), Add<true>(1)) { EXPECT_EQUAL(10u, f1.rendezvous(10).first); EXPECT_EQUAL(20u, f1.rendezvous(20).first); EXPECT_EQUAL(30u, f1.rendezvous(30).first); + EXPECT_EQUAL(10u, f2.rendezvous(10, thread_id).first); + EXPECT_EQUAL(20u, f2.rendezvous(20, thread_id).first); + EXPECT_EQUAL(30u, f2.rendezvous(30, thread_id).first); } -TEST_MT_F("require that rendezvous can mingle multiple threads", 10, Add(num_threads)) { +TEST_MT_FF("require that rendezvous can mingle multiple threads", 10, Add<false>(num_threads), Add<true>(num_threads)) { EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first); + EXPECT_EQUAL(45u, f2.rendezvous(thread_id, thread_id).first); } -typedef Empty<Value> Empty1; -typedef Empty<size_t> Empty2; -TEST_MT_FF("require that unset rendezvous outputs are default constructed", 10, Empty1(num_threads), Empty2(num_threads)) { - EXPECT_EQUAL(42u, f1.meet().value); - EXPECT_EQUAL(0u, f2.meet()); +template <bool ext_id> using Empty1 = Empty<Value, ext_id>; +template <bool ext_id> using Empty2 = Empty<size_t, ext_id>; + +TEST_MT_FFFF("require that unset rendezvous outputs are default constructed", 10, + Empty1<false>(num_threads), Empty2<false>(num_threads), + Empty1<true>(num_threads), Empty2<true>(num_threads)) +{ + EXPECT_EQUAL(42u, f1.meet(thread_id).value); + EXPECT_EQUAL(0u, f2.meet(thread_id)); + EXPECT_EQUAL(42u, f3.meet(thread_id).value); + EXPECT_EQUAL(0u, f4.meet(thread_id)); } -TEST_MT_FF("require that mingle is not called until all threads are present", 3, Add(num_threads), - CountDownLatch(num_threads - 1)) +TEST_MT_FFFF("require that mingle is not called until all threads are present", 3, + Add<false>(num_threads), CountDownLatch(num_threads - 1), + Add<true>(num_threads), CountDownLatch(num_threads - 1)) { - if (thread_id == 0) { - EXPECT_FALSE(f2.await(20)); - EXPECT_EQUAL(3u, f1.rendezvous(thread_id).first); - EXPECT_TRUE(f2.await(25000)); - } else { - EXPECT_EQUAL(3u, f1.rendezvous(thread_id).first); - f2.countDown(); + for (bool ext_id: {false, true}) { + CountDownLatch &latch = ext_id ? f4 : f2; + if (thread_id == 0) { + EXPECT_FALSE(latch.await(20)); + if (ext_id) { + EXPECT_EQUAL(3u, f3.rendezvous(thread_id, thread_id).first); + } else { + EXPECT_EQUAL(3u, f1.rendezvous(thread_id).first); + } + EXPECT_TRUE(latch.await(25000)); + } else { + if (ext_id) { + EXPECT_EQUAL(3u, f3.rendezvous(thread_id, thread_id).first); + } else { + EXPECT_EQUAL(3u, f1.rendezvous(thread_id).first); + } + latch.countDown(); + } } } -TEST_MT_F("require that rendezvous can be used multiple times", 10, Add(num_threads)) { - EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first); - EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first); +TEST_MT_FF("require that rendezvous can be used multiple times", 10, Add<false>(num_threads), Add<true>(num_threads)) { EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first); + EXPECT_EQUAL(45u, f2.rendezvous(thread_id, thread_id).first); EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first); + EXPECT_EQUAL(45u, f2.rendezvous(thread_id, thread_id).first); EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first); + EXPECT_EQUAL(45u, f2.rendezvous(thread_id, thread_id).first); } -TEST_MT_FF("require that rendezvous can be run with additional threads", 100, Add(10), CountDownLatch(10)) { +TEST_MT_FF("require that rendezvous can be run with additional threads", 100, Add<false>(10), CountDownLatch(10)) { auto res = f1.rendezvous(thread_id); TEST_BARRIER(); if (res.second == thread_id) { @@ -105,16 +186,46 @@ TEST_MT_FF("require that rendezvous can be run with additional threads", 100, Ad EXPECT_TRUE(f2.await(25000)); } -TEST_MT_F("require that mingle can modify its own copy of input values", 10, Modify(num_threads)) { +TEST_MT_FF("require that mingle can modify its own copy of input values", 10, Modify<false>(num_threads), Modify<true>(num_threads)) { size_t my_input = thread_id; - size_t my_output = f1.rendezvous(my_input); + size_t my_output1 = f1.rendezvous(my_input); + size_t my_output2 = f2.rendezvous(my_input, thread_id); EXPECT_EQUAL(my_input, thread_id); - EXPECT_EQUAL(my_output, thread_id + 1); + EXPECT_EQUAL(my_output1, thread_id + 1); + EXPECT_EQUAL(my_output2, thread_id + 1); } -TEST_MT_F("require that threads can exchange non-copyable state", 2, Swap<std::unique_ptr<size_t> >()) { - auto other = f1.rendezvous(std::make_unique<size_t>(thread_id)); - EXPECT_EQUAL(*other, 1 - thread_id); +using Swap_false = Swap<std::unique_ptr<size_t>,false>; +using Swap_true = Swap<std::unique_ptr<size_t>,true>; + +TEST_MT_FF("require that threads can exchange non-copyable state", 2, Swap_false(), Swap_true()) { + auto other1 = f1.rendezvous(std::make_unique<size_t>(thread_id)); + EXPECT_EQUAL(*other1, 1 - thread_id); + auto other2 = f2.rendezvous(std::make_unique<size_t>(thread_id), thread_id); + EXPECT_EQUAL(*other2, 1 - thread_id); +} + +TEST_MT_F("require that participation id can be explicitly defined", 10, DetectId<true>(num_threads)) { + for (size_t i = 0; i < 128; ++i) { + size_t my_id = f1.meet(thread_id); + EXPECT_EQUAL(my_id, thread_id); + } +} + +TEST_MT_FF("require that participation id is unstable when not explicitly defined", 10, DetectId<false>(num_threads), Any(num_threads)) { + bool id_mismatch = false; + size_t old_id = f1.meet(thread_id); + for (size_t i = 0; !id_mismatch; ++i) { + if ((i % num_threads) == thread_id) { + std::this_thread::sleep_for(std::chrono::milliseconds(i)); + } + size_t new_id = f1.meet(thread_id); + if (new_id != old_id) { + id_mismatch = true; + } + id_mismatch = f2.check(id_mismatch); + } + EXPECT_TRUE(id_mismatch); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/rendezvous.h b/vespalib/src/vespa/vespalib/util/rendezvous.h index be589f57b9d..e7259117272 100644 --- a/vespalib/src/vespa/vespalib/util/rendezvous.h +++ b/vespalib/src/vespa/vespalib/util/rendezvous.h @@ -2,6 +2,7 @@ #pragma once +#include <type_traits> #include <condition_variable> #include <vector> @@ -18,7 +19,7 @@ namespace vespalib { * subclass needs to implement the mingle function to supply the * application logic. **/ -template <typename IN, typename OUT> +template <typename IN, typename OUT, bool external_id = false> class Rendezvous { private: @@ -36,6 +37,17 @@ private: **/ virtual void mingle() = 0; + /** + * lock-free version for when there is only one thread meeting + * itself. + **/ + void meet_self(IN &input, OUT &output); + + /** + * general version for when there are multiple threads meeting. + **/ + void meet_others(IN &input, OUT &output, size_t my_id, std::unique_lock<std::mutex> guard); + protected: /** * Obtain the number of input and output values to be handled by @@ -81,7 +93,22 @@ public: * @return output parameter for a single thread * @param input input parameter for a single thread **/ - OUT rendezvous(IN input); + template <bool ext_id = external_id> + typename std::enable_if<!ext_id,OUT>::type rendezvous(IN input); + + /** + * Called by individual threads to synchronize execution and share + * state with the mingle function where each caller has a + * pre-defined participation id (enable by setting the external_id + * template flag). + * + * @return output parameter for a single thread + * @param input input parameter for a single thread + * @param my_id participant id for this thread (must be in range and + * not conflicting with other threads) + **/ + template <bool ext_id = external_id> + typename std::enable_if<ext_id,OUT>::type rendezvous(IN input, size_t my_id); }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/rendezvous.hpp b/vespalib/src/vespa/vespalib/util/rendezvous.hpp index 2af5a55c8ab..284b536460a 100644 --- a/vespalib/src/vespa/vespalib/util/rendezvous.hpp +++ b/vespalib/src/vespa/vespalib/util/rendezvous.hpp @@ -1,52 +1,91 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "exceptions.h" +#include <cassert> namespace vespalib { -template <typename IN, typename OUT> -Rendezvous<IN, OUT>::Rendezvous(size_t n) +template <typename IN, typename OUT, bool external_id> +void +Rendezvous<IN, OUT, external_id>::meet_self(IN &input, OUT &output) { + _in[0] = &input; + _out[0] = &output; + mingle(); +} + +template <typename IN, typename OUT, bool external_id> +void +Rendezvous<IN, OUT, external_id>::meet_others(IN &input, OUT &output, size_t my_id, std::unique_lock<std::mutex> guard) +{ + if (external_id) { + assert(_in[my_id] == nullptr); + assert(_out[my_id] == nullptr); + } + _in[my_id] = &input; + _out[my_id] = &output; + if (++_next == _size) { + mingle(); + if (external_id) { + std::fill(_in.begin(), _in.end(), nullptr); + std::fill(_out.begin(), _out.end(), nullptr); + } + _next = 0; + ++_gen; + _cond.notify_all(); + } else { + size_t oldgen = _gen; + while (oldgen == _gen) { + _cond.wait(guard); + } + } +} + +template <typename IN, typename OUT, bool external_id> +Rendezvous<IN, OUT, external_id>::Rendezvous(size_t n) : _lock(), _cond(), _size(n), _next(0), _gen(0), - _in(n, 0), - _out(n, 0) + _in(n, nullptr), + _out(n, nullptr) { if (n == 0) { throw IllegalArgumentException("size must be greater than 0"); } } -template <typename IN, typename OUT> -Rendezvous<IN, OUT>::~Rendezvous() = default; +template <typename IN, typename OUT, bool external_id> +Rendezvous<IN, OUT, external_id>::~Rendezvous() = default; -template <typename IN, typename OUT> -OUT -Rendezvous<IN, OUT>::rendezvous(IN input) +template <typename IN, typename OUT, bool external_id> +template <bool ext_id> +typename std::enable_if<!ext_id,OUT>::type +Rendezvous<IN, OUT, external_id>::rendezvous(IN input) { - OUT ret = OUT(); + OUT ret{}; + static_assert(ext_id == external_id); if (_size == 1) { - _in[0] = &input; - _out[0] = &ret; - mingle(); + meet_self(input, ret); } else { std::unique_lock guard(_lock); - size_t me = _next++; - _in[me] = &input; - _out[me] = &ret; - if (_next == _size) { - mingle(); - _next = 0; - ++_gen; - _cond.notify_all(); - } else { - size_t oldgen = _gen; - while (oldgen == _gen) { - _cond.wait(guard); - } - } + meet_others(input, ret, _next, std::move(guard)); + } + return ret; +} + +template <typename IN, typename OUT, bool external_id> +template <bool ext_id> +typename std::enable_if<ext_id,OUT>::type +Rendezvous<IN, OUT, external_id>::rendezvous(IN input, size_t my_id) +{ + OUT ret{}; + assert(my_id < _size); + static_assert(ext_id == external_id); + if (_size == 1) { + meet_self(input, ret); + } else { + meet_others(input, ret, my_id, std::unique_lock(_lock)); } return ret; } |