summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2020-10-30 10:06:37 +0000
committerHåvard Pettersen <havardpe@oath.com>2020-10-30 10:06:37 +0000
commit2e686a231d377198331d4a6c9971b32a83a22bbc (patch)
treea7f25af1bc267dd4b39cae66a3505734bd9e58b2 /vespalib
parent9a6ee1f2287a679f31cb3706953dede231f13bb3 (diff)
enable explicitly specifying the participation id
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/rendezvous/rendezvous_test.cpp193
-rw-r--r--vespalib/src/vespa/vespalib/util/rendezvous.h31
-rw-r--r--vespalib/src/vespa/vespalib/util/rendezvous.hpp93
3 files changed, 247 insertions, 70 deletions
diff --git a/vespalib/src/tests/rendezvous/rendezvous_test.cpp b/vespalib/src/tests/rendezvous/rendezvous_test.cpp
index bce25692760..f4ec7870ad5 100644
--- a/vespalib/src/tests/rendezvous/rendezvous_test.cpp
+++ b/vespalib/src/tests/rendezvous/rendezvous_test.cpp
@@ -1,7 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/util/rendezvous.h>
+#include <vespa/vespalib/util/time.h>
#include <utility>
+#include <thread>
using namespace vespalib;
@@ -10,28 +12,45 @@ struct Value {
Value() : value(42) {}
};
-template <typename T>
-struct Empty : Rendezvous<int, T> {
- Empty(size_t n) : Rendezvous<int, T>(n) {}
+template <typename T, bool ext_id>
+struct Empty : Rendezvous<int, T, ext_id> {
+ Empty(size_t n) : Rendezvous<int, T, ext_id>(n) {}
void mingle() override {}
- T meet() { return this->rendezvous(0); }
+ T meet(size_t thread_id) {
+ if constexpr (ext_id) {
+ return this->rendezvous(0, thread_id);
+ } else {
+ (void) thread_id;
+ return this->rendezvous(0);
+ }
+ }
};
-struct Add : Rendezvous<size_t, std::pair<size_t, size_t> > {
- Add(size_t n) : Rendezvous<size_t, std::pair<size_t, size_t> >(n) {}
+template <bool ext_id>
+struct Add : Rendezvous<size_t, std::pair<size_t, size_t>, ext_id> {
+ using Super = Rendezvous<size_t, std::pair<size_t, size_t>, ext_id>;
+ using Super::size;
+ using Super::in;
+ using Super::out;
+ Add(size_t n) : Super(n) {}
void mingle() override {
size_t sum = 0;
for (size_t i = 0; i < size(); ++i) {
sum += in(i);
}
- for (size_t i = 0; i < size(); ++i) {
+ for (size_t i = 0; i < this->size(); ++i) {
out(i) = std::make_pair(sum, in(0));
}
}
};
-struct Modify : Rendezvous<size_t, size_t> {
- Modify(size_t n) : Rendezvous<size_t, size_t>(n) {}
+template <bool ext_id>
+struct Modify : Rendezvous<size_t, size_t, ext_id> {
+ using Super = Rendezvous<size_t, size_t, ext_id>;
+ using Super::size;
+ using Super::in;
+ using Super::out;
+ Modify(size_t n) : Super(n) {}
void mingle() override {
for (size_t i = 0; i < size(); ++i) {
in(i) += 1;
@@ -42,60 +61,122 @@ struct Modify : Rendezvous<size_t, size_t> {
}
};
-template <typename T>
-struct Swap : Rendezvous<T, T> {
- using Rendezvous<T, T>::in;
- using Rendezvous<T, T>::out;
- Swap() : Rendezvous<T, T>(2) {}
+template <typename T, bool ext_id>
+struct Swap : Rendezvous<T, T, ext_id> {
+ using Super = Rendezvous<T, T, ext_id>;
+ using Super::size;
+ using Super::in;
+ using Super::out;
+ Swap() : Super(2) {}
void mingle() override {
out(0) = std::move(in(1));
out(1) = std::move(in(0));
}
};
+template <bool ext_id>
+struct DetectId : Rendezvous<int, size_t, ext_id> {
+ using Super = Rendezvous<int, size_t, ext_id>;
+ using Super::size;
+ using Super::in;
+ using Super::out;
+ DetectId(size_t n) : Super(n) {}
+ void mingle() override {
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = i;
+ }
+ }
+ size_t meet(size_t thread_id) {
+ if constexpr (ext_id) {
+ return this->rendezvous(0, thread_id);
+ } else {
+ (void) thread_id;
+ return this->rendezvous(0);
+ }
+ }
+};
+
+struct Any : Rendezvous<bool, bool> {
+ Any(size_t n) : Rendezvous<bool, bool>(n) {}
+ void mingle() override {
+ bool result = false;
+ for (size_t i = 0; i < size(); ++i) {
+ result |= in(i);
+ }
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = result;
+ }
+ }
+ bool check(bool flag) { return this->rendezvous(flag); }
+};
+
TEST("require that creating an empty rendezvous will fail") {
- EXPECT_EXCEPTION(Add(0), IllegalArgumentException, "");
+ EXPECT_EXCEPTION(Add<false>(0), IllegalArgumentException, "");
+ EXPECT_EXCEPTION(Add<true>(0), IllegalArgumentException, "");
}
-TEST_F("require that a single thread can mingle with itself within a rendezvous", Add(1)) {
+TEST_FF("require that a single thread can mingle with itself within a rendezvous", Add<false>(1), Add<true>(1)) {
EXPECT_EQUAL(10u, f1.rendezvous(10).first);
EXPECT_EQUAL(20u, f1.rendezvous(20).first);
EXPECT_EQUAL(30u, f1.rendezvous(30).first);
+ EXPECT_EQUAL(10u, f2.rendezvous(10, thread_id).first);
+ EXPECT_EQUAL(20u, f2.rendezvous(20, thread_id).first);
+ EXPECT_EQUAL(30u, f2.rendezvous(30, thread_id).first);
}
-TEST_MT_F("require that rendezvous can mingle multiple threads", 10, Add(num_threads)) {
+TEST_MT_FF("require that rendezvous can mingle multiple threads", 10, Add<false>(num_threads), Add<true>(num_threads)) {
EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first);
+ EXPECT_EQUAL(45u, f2.rendezvous(thread_id, thread_id).first);
}
-typedef Empty<Value> Empty1;
-typedef Empty<size_t> Empty2;
-TEST_MT_FF("require that unset rendezvous outputs are default constructed", 10, Empty1(num_threads), Empty2(num_threads)) {
- EXPECT_EQUAL(42u, f1.meet().value);
- EXPECT_EQUAL(0u, f2.meet());
+template <bool ext_id> using Empty1 = Empty<Value, ext_id>;
+template <bool ext_id> using Empty2 = Empty<size_t, ext_id>;
+
+TEST_MT_FFFF("require that unset rendezvous outputs are default constructed", 10,
+ Empty1<false>(num_threads), Empty2<false>(num_threads),
+ Empty1<true>(num_threads), Empty2<true>(num_threads))
+{
+ EXPECT_EQUAL(42u, f1.meet(thread_id).value);
+ EXPECT_EQUAL(0u, f2.meet(thread_id));
+ EXPECT_EQUAL(42u, f3.meet(thread_id).value);
+ EXPECT_EQUAL(0u, f4.meet(thread_id));
}
-TEST_MT_FF("require that mingle is not called until all threads are present", 3, Add(num_threads),
- CountDownLatch(num_threads - 1))
+TEST_MT_FFFF("require that mingle is not called until all threads are present", 3,
+ Add<false>(num_threads), CountDownLatch(num_threads - 1),
+ Add<true>(num_threads), CountDownLatch(num_threads - 1))
{
- if (thread_id == 0) {
- EXPECT_FALSE(f2.await(20));
- EXPECT_EQUAL(3u, f1.rendezvous(thread_id).first);
- EXPECT_TRUE(f2.await(25000));
- } else {
- EXPECT_EQUAL(3u, f1.rendezvous(thread_id).first);
- f2.countDown();
+ for (bool ext_id: {false, true}) {
+ CountDownLatch &latch = ext_id ? f4 : f2;
+ if (thread_id == 0) {
+ EXPECT_FALSE(latch.await(20));
+ if (ext_id) {
+ EXPECT_EQUAL(3u, f3.rendezvous(thread_id, thread_id).first);
+ } else {
+ EXPECT_EQUAL(3u, f1.rendezvous(thread_id).first);
+ }
+ EXPECT_TRUE(latch.await(25000));
+ } else {
+ if (ext_id) {
+ EXPECT_EQUAL(3u, f3.rendezvous(thread_id, thread_id).first);
+ } else {
+ EXPECT_EQUAL(3u, f1.rendezvous(thread_id).first);
+ }
+ latch.countDown();
+ }
}
}
-TEST_MT_F("require that rendezvous can be used multiple times", 10, Add(num_threads)) {
- EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first);
- EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first);
+TEST_MT_FF("require that rendezvous can be used multiple times", 10, Add<false>(num_threads), Add<true>(num_threads)) {
EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first);
+ EXPECT_EQUAL(45u, f2.rendezvous(thread_id, thread_id).first);
EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first);
+ EXPECT_EQUAL(45u, f2.rendezvous(thread_id, thread_id).first);
EXPECT_EQUAL(45u, f1.rendezvous(thread_id).first);
+ EXPECT_EQUAL(45u, f2.rendezvous(thread_id, thread_id).first);
}
-TEST_MT_FF("require that rendezvous can be run with additional threads", 100, Add(10), CountDownLatch(10)) {
+TEST_MT_FF("require that rendezvous can be run with additional threads", 100, Add<false>(10), CountDownLatch(10)) {
auto res = f1.rendezvous(thread_id);
TEST_BARRIER();
if (res.second == thread_id) {
@@ -105,16 +186,46 @@ TEST_MT_FF("require that rendezvous can be run with additional threads", 100, Ad
EXPECT_TRUE(f2.await(25000));
}
-TEST_MT_F("require that mingle can modify its own copy of input values", 10, Modify(num_threads)) {
+TEST_MT_FF("require that mingle can modify its own copy of input values", 10, Modify<false>(num_threads), Modify<true>(num_threads)) {
size_t my_input = thread_id;
- size_t my_output = f1.rendezvous(my_input);
+ size_t my_output1 = f1.rendezvous(my_input);
+ size_t my_output2 = f2.rendezvous(my_input, thread_id);
EXPECT_EQUAL(my_input, thread_id);
- EXPECT_EQUAL(my_output, thread_id + 1);
+ EXPECT_EQUAL(my_output1, thread_id + 1);
+ EXPECT_EQUAL(my_output2, thread_id + 1);
}
-TEST_MT_F("require that threads can exchange non-copyable state", 2, Swap<std::unique_ptr<size_t> >()) {
- auto other = f1.rendezvous(std::make_unique<size_t>(thread_id));
- EXPECT_EQUAL(*other, 1 - thread_id);
+using Swap_false = Swap<std::unique_ptr<size_t>,false>;
+using Swap_true = Swap<std::unique_ptr<size_t>,true>;
+
+TEST_MT_FF("require that threads can exchange non-copyable state", 2, Swap_false(), Swap_true()) {
+ auto other1 = f1.rendezvous(std::make_unique<size_t>(thread_id));
+ EXPECT_EQUAL(*other1, 1 - thread_id);
+ auto other2 = f2.rendezvous(std::make_unique<size_t>(thread_id), thread_id);
+ EXPECT_EQUAL(*other2, 1 - thread_id);
+}
+
+TEST_MT_F("require that participation id can be explicitly defined", 10, DetectId<true>(num_threads)) {
+ for (size_t i = 0; i < 128; ++i) {
+ size_t my_id = f1.meet(thread_id);
+ EXPECT_EQUAL(my_id, thread_id);
+ }
+}
+
+TEST_MT_FF("require that participation id is unstable when not explicitly defined", 10, DetectId<false>(num_threads), Any(num_threads)) {
+ bool id_mismatch = false;
+ size_t old_id = f1.meet(thread_id);
+ for (size_t i = 0; !id_mismatch; ++i) {
+ if ((i % num_threads) == thread_id) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(i));
+ }
+ size_t new_id = f1.meet(thread_id);
+ if (new_id != old_id) {
+ id_mismatch = true;
+ }
+ id_mismatch = f2.check(id_mismatch);
+ }
+ EXPECT_TRUE(id_mismatch);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/rendezvous.h b/vespalib/src/vespa/vespalib/util/rendezvous.h
index be589f57b9d..e7259117272 100644
--- a/vespalib/src/vespa/vespalib/util/rendezvous.h
+++ b/vespalib/src/vespa/vespalib/util/rendezvous.h
@@ -2,6 +2,7 @@
#pragma once
+#include <type_traits>
#include <condition_variable>
#include <vector>
@@ -18,7 +19,7 @@ namespace vespalib {
* subclass needs to implement the mingle function to supply the
* application logic.
**/
-template <typename IN, typename OUT>
+template <typename IN, typename OUT, bool external_id = false>
class Rendezvous
{
private:
@@ -36,6 +37,17 @@ private:
**/
virtual void mingle() = 0;
+ /**
+ * lock-free version for when there is only one thread meeting
+ * itself.
+ **/
+ void meet_self(IN &input, OUT &output);
+
+ /**
+ * general version for when there are multiple threads meeting.
+ **/
+ void meet_others(IN &input, OUT &output, size_t my_id, std::unique_lock<std::mutex> guard);
+
protected:
/**
* Obtain the number of input and output values to be handled by
@@ -81,7 +93,22 @@ public:
* @return output parameter for a single thread
* @param input input parameter for a single thread
**/
- OUT rendezvous(IN input);
+ template <bool ext_id = external_id>
+ typename std::enable_if<!ext_id,OUT>::type rendezvous(IN input);
+
+ /**
+ * Called by individual threads to synchronize execution and share
+ * state with the mingle function where each caller has a
+ * pre-defined participation id (enable by setting the external_id
+ * template flag).
+ *
+ * @return output parameter for a single thread
+ * @param input input parameter for a single thread
+ * @param my_id participant id for this thread (must be in range and
+ * not conflicting with other threads)
+ **/
+ template <bool ext_id = external_id>
+ typename std::enable_if<ext_id,OUT>::type rendezvous(IN input, size_t my_id);
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/rendezvous.hpp b/vespalib/src/vespa/vespalib/util/rendezvous.hpp
index 2af5a55c8ab..284b536460a 100644
--- a/vespalib/src/vespa/vespalib/util/rendezvous.hpp
+++ b/vespalib/src/vespa/vespalib/util/rendezvous.hpp
@@ -1,52 +1,91 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "exceptions.h"
+#include <cassert>
namespace vespalib {
-template <typename IN, typename OUT>
-Rendezvous<IN, OUT>::Rendezvous(size_t n)
+template <typename IN, typename OUT, bool external_id>
+void
+Rendezvous<IN, OUT, external_id>::meet_self(IN &input, OUT &output) {
+ _in[0] = &input;
+ _out[0] = &output;
+ mingle();
+}
+
+template <typename IN, typename OUT, bool external_id>
+void
+Rendezvous<IN, OUT, external_id>::meet_others(IN &input, OUT &output, size_t my_id, std::unique_lock<std::mutex> guard)
+{
+ if (external_id) {
+ assert(_in[my_id] == nullptr);
+ assert(_out[my_id] == nullptr);
+ }
+ _in[my_id] = &input;
+ _out[my_id] = &output;
+ if (++_next == _size) {
+ mingle();
+ if (external_id) {
+ std::fill(_in.begin(), _in.end(), nullptr);
+ std::fill(_out.begin(), _out.end(), nullptr);
+ }
+ _next = 0;
+ ++_gen;
+ _cond.notify_all();
+ } else {
+ size_t oldgen = _gen;
+ while (oldgen == _gen) {
+ _cond.wait(guard);
+ }
+ }
+}
+
+template <typename IN, typename OUT, bool external_id>
+Rendezvous<IN, OUT, external_id>::Rendezvous(size_t n)
: _lock(),
_cond(),
_size(n),
_next(0),
_gen(0),
- _in(n, 0),
- _out(n, 0)
+ _in(n, nullptr),
+ _out(n, nullptr)
{
if (n == 0) {
throw IllegalArgumentException("size must be greater than 0");
}
}
-template <typename IN, typename OUT>
-Rendezvous<IN, OUT>::~Rendezvous() = default;
+template <typename IN, typename OUT, bool external_id>
+Rendezvous<IN, OUT, external_id>::~Rendezvous() = default;
-template <typename IN, typename OUT>
-OUT
-Rendezvous<IN, OUT>::rendezvous(IN input)
+template <typename IN, typename OUT, bool external_id>
+template <bool ext_id>
+typename std::enable_if<!ext_id,OUT>::type
+Rendezvous<IN, OUT, external_id>::rendezvous(IN input)
{
- OUT ret = OUT();
+ OUT ret{};
+ static_assert(ext_id == external_id);
if (_size == 1) {
- _in[0] = &input;
- _out[0] = &ret;
- mingle();
+ meet_self(input, ret);
} else {
std::unique_lock guard(_lock);
- size_t me = _next++;
- _in[me] = &input;
- _out[me] = &ret;
- if (_next == _size) {
- mingle();
- _next = 0;
- ++_gen;
- _cond.notify_all();
- } else {
- size_t oldgen = _gen;
- while (oldgen == _gen) {
- _cond.wait(guard);
- }
- }
+ meet_others(input, ret, _next, std::move(guard));
+ }
+ return ret;
+}
+
+template <typename IN, typename OUT, bool external_id>
+template <bool ext_id>
+typename std::enable_if<ext_id,OUT>::type
+Rendezvous<IN, OUT, external_id>::rendezvous(IN input, size_t my_id)
+{
+ OUT ret{};
+ assert(my_id < _size);
+ static_assert(ext_id == external_id);
+ if (_size == 1) {
+ meet_self(input, ret);
+ } else {
+ meet_others(input, ret, my_id, std::unique_lock(_lock));
}
return ret;
}