aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-12-03 13:13:23 +0100
committerGitHub <noreply@github.com>2018-12-03 13:13:23 +0100
commit97cf964467aa3879efe1393ad4f49cffe60b3bf7 (patch)
tree4dcb26c3a083a9a330c0a3bf40b7d31fc82a96af
parent523f82a4a441ef60db6ef741bc0673c1e4214730 (diff)
parent9fcf6a80476e569a9ab2bf21f4615bb350c0f941 (diff)
Merge pull request #7822 from vespa-engine/havardpe/latch
Havardpe/latch
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp30
-rw-r--r--vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/tests/latch/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/latch/latch_test.cpp99
-rw-r--r--vespalib/src/tests/portal/portal_test.cpp84
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt7
-rw-r--r--vespalib/src/vespa/vespalib/util/latch.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/latch.h59
8 files changed, 233 insertions, 62 deletions
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index dd08f365d58..80b17fba7db 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -2,6 +2,7 @@
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/net/socket_spec.h>
#include <vespa/vespalib/util/benchmark_timer.h>
+#include <vespa/vespalib/util/latch.h>
#include <vespa/fnet/frt/frt.h>
#include <mutex>
#include <condition_variable>
@@ -21,30 +22,13 @@ vespalib::CryptoEngine::SP crypto;
class RequestLatch : public FRT_IRequestWait {
private:
- FRT_RPCRequest *_req;
- std::mutex _lock;
- std::condition_variable _cond;
+ vespalib::Latch<FRT_RPCRequest*> _latch;
public:
- RequestLatch() : _req(nullptr), _lock(), _cond() {}
- ~RequestLatch() { ASSERT_TRUE(_req == nullptr); }
- bool has_req() {
- std::lock_guard guard(_lock);
- return (_req != nullptr);
- }
- FRT_RPCRequest *read() {
- std::unique_lock guard(_lock);
- _cond.wait(guard, [&req = _req]{ return (req != nullptr); });
- auto ret = _req;
- _req = nullptr;
- _cond.notify_all();
- return ret;
- }
- void write(FRT_RPCRequest *req) {
- std::unique_lock guard(_lock);
- _cond.wait(guard, [&req = _req]{ return (req == nullptr); });
- _req = req;
- _cond.notify_all();
- }
+ RequestLatch() : _latch() {}
+ ~RequestLatch() { ASSERT_TRUE(!has_req()); }
+ bool has_req() { return _latch.has_value(); }
+ FRT_RPCRequest *read() { return _latch.read(); }
+ void write(FRT_RPCRequest *req) { _latch.write(req); }
void RequestDone(FRT_RPCRequest *req) override { write(req); }
};
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index 8bd3dda4e5a..fb60cc66931 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -50,6 +50,7 @@ vespa_define_module(
src/tests/host_name
src/tests/io/fileutil
src/tests/io/mapped_file_input
+ src/tests/latch
src/tests/left_right_heap
src/tests/make_fixture_macros
src/tests/memory
diff --git a/vespalib/src/tests/latch/CMakeLists.txt b/vespalib/src/tests/latch/CMakeLists.txt
new file mode 100644
index 00000000000..57a67d74868
--- /dev/null
+++ b/vespalib/src/tests/latch/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_latch_test_app TEST
+ SOURCES
+ latch_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_latch_test_app COMMAND vespalib_latch_test_app)
diff --git a/vespalib/src/tests/latch/latch_test.cpp b/vespalib/src/tests/latch/latch_test.cpp
new file mode 100644
index 00000000000..f29b673e508
--- /dev/null
+++ b/vespalib/src/tests/latch/latch_test.cpp
@@ -0,0 +1,99 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/gate.h>
+#include <vespa/vespalib/util/latch.h>
+
+using namespace vespalib;
+
+TEST("require that write then read works") {
+ Latch<int> latch;
+ EXPECT_TRUE(!latch.has_value());
+ latch.write(42);
+ EXPECT_TRUE(latch.has_value());
+ EXPECT_EQUAL(latch.read(), 42);
+ EXPECT_TRUE(!latch.has_value());
+}
+
+TEST_MT_FFF("require that read waits for write", 2, Latch<int>(), Gate(), TimeBomb(60)) {
+ if (thread_id == 0) {
+ EXPECT_TRUE(!f2.await(10));
+ f1.write(123);
+ EXPECT_TRUE(f2.await(60000));
+ } else {
+ EXPECT_EQUAL(f1.read(), 123);
+ f2.countDown();
+ }
+}
+
+TEST_MT_FFF("require that write waits for read", 2, Latch<int>(), Gate(), TimeBomb(60)) {
+ if (thread_id == 0) {
+ f1.write(123);
+ f1.write(456);
+ f2.countDown();
+ } else {
+ EXPECT_TRUE(!f2.await(10));
+ EXPECT_EQUAL(f1.read(), 123);
+ EXPECT_TRUE(f2.await(60000));
+ EXPECT_EQUAL(f1.read(), 456);
+ }
+}
+
+struct MyInt {
+ int value;
+ MyInt(int value_in) : value(value_in) {}
+ MyInt(MyInt &&rhs) = default;
+ MyInt(const MyInt &rhs) = delete;
+ MyInt &operator=(const MyInt &rhs) = delete;
+ MyInt &operator=(MyInt &&rhs) = delete;
+};
+
+TEST("require that un-assignable non-default-constructable move-only objects can be used") {
+ Latch<MyInt> latch;
+ latch.write(MyInt(1337));
+ EXPECT_EQUAL(latch.read().value, 1337);
+}
+
+struct MyObj {
+ static int total;
+ int *with_state;
+ MyObj(int &with_state_in) : with_state(&with_state_in) {}
+ MyObj(MyObj &&rhs) {
+ with_state = rhs.with_state;
+ rhs.with_state = nullptr;
+ }
+ void detach() { with_state = nullptr; }
+ ~MyObj() {
+ ++total;
+ if (with_state) {
+ ++(*with_state);
+ }
+ }
+ MyObj(const MyObj &rhs) = delete;
+ MyObj &operator=(const MyObj &rhs) = delete;
+ MyObj &operator=(MyObj &&rhs) = delete;
+};
+int MyObj::total = 0;
+
+TEST("require that latched objects are appropriately destructed") {
+ int with_state = 0;
+ int total_sample = 0;
+ {
+ Latch<MyObj> latch1;
+ Latch<MyObj> latch2;
+ Latch<MyObj> latch3;
+ latch2.write(MyObj(with_state));
+ latch3.write(MyObj(with_state));
+ latch2.read().detach();
+ EXPECT_TRUE(!latch1.has_value());
+ EXPECT_TRUE(!latch2.has_value());
+ EXPECT_TRUE(latch3.has_value());
+ EXPECT_EQUAL(with_state, 0);
+ EXPECT_GREATER_EQUAL(MyObj::total, 1);
+ total_sample = MyObj::total;
+ }
+ EXPECT_EQUAL(MyObj::total, total_sample + 1);
+ EXPECT_EQUAL(with_state, 1);
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/portal/portal_test.cpp b/vespalib/src/tests/portal/portal_test.cpp
index cac4b78d3e2..a277914bef5 100644
--- a/vespalib/src/tests/portal/portal_test.cpp
+++ b/vespalib/src/tests/portal/portal_test.cpp
@@ -10,7 +10,7 @@
#include <vespa/vespalib/net/tls/tls_crypto_engine.h>
#include <vespa/vespalib/net/tls/maybe_tls_crypto_engine.h>
#include <vespa/vespalib/test/make_tls_options_for_testing.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/util/latch.h>
using namespace vespalib;
@@ -161,29 +161,6 @@ TEST("require that get requests dropped on the floor returns HTTP error") {
EXPECT_EQUAL(result, expect);
}
-struct GetTask : public Executor::Task {
- Portal::GetRequest request;
- GetTask(Portal::GetRequest request_in) : request(std::move(request_in)) {}
- void run() override {
- request.respond_with_content("text/plain", "hello");
- }
-};
-
-TEST("require that GET requests can be completed in another thread") {
- vespalib::string path = "/test";
- ThreadStackExecutor executor(1, 128 * 1024);
- auto portal = Portal::create(null_crypto(), 0);
- auto expect = make_expected_response("text/plain", "hello");
- MyGetHandler handler([&executor](Portal::GetRequest request)
- {
- executor.execute(std::make_unique<GetTask>(std::move(request)));
- });
- auto bound = portal->bind(path, handler);
- auto result = fetch(portal->listen_port(), null_crypto(), path);
- EXPECT_EQUAL(result, expect);
- executor.shutdown().sync();
-}
-
TEST("require that bogus request returns HTTP error") {
auto portal = Portal::create(null_crypto(), 0);
auto expect = make_expected_error(400, "Bad Request");
@@ -225,7 +202,7 @@ TEST("require that newer handlers with the same prefix shadows older ones") {
EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler1"));
}
-TEST("require that connection errors do not block shutdown by leaking resources (also tests tight shutdown timing)") {
+TEST("require that connection errors do not block shutdown by leaking resources") {
MyGetHandler handler([](Portal::GetRequest request)
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
@@ -257,35 +234,70 @@ TEST("require that connection errors do not block shutdown by leaking resources
}
}
-struct WaitingFixture {
+struct LatchedFixture {
Portal::SP portal;
- Gate enter_callback;
- Gate exit_callback;
MyGetHandler handler;
Portal::Token::UP bound;
- WaitingFixture() : portal(Portal::create(null_crypto(), 0)),
- enter_callback(),
- exit_callback(),
+ Gate enter_callback;
+ Latch<Portal::GetRequest> latch;
+ Gate exit_callback;
+ LatchedFixture() : portal(Portal::create(null_crypto(), 0)),
handler([this](Portal::GetRequest request)
{
enter_callback.countDown();
- request.respond_with_content("application/json", "[1,2,3]");
+ latch.write(std::move(request));
exit_callback.await();
}),
- bound(portal->bind("/test", handler)) {}
+ bound(portal->bind("/test", handler)),
+ enter_callback(), latch(), exit_callback() {}
};
-TEST_MT_FFF("require that bind token destruction waits for active requests", 3,
- WaitingFixture(), Gate(), TimeBomb(60))
+TEST_MT_FF("require that GET requests can be completed in another thread", 2,
+ LatchedFixture(), TimeBomb(60))
{
if (thread_id == 0) {
- f1.enter_callback.await();
+ Portal::GetRequest req = f1.latch.read();
+ f1.exit_callback.countDown();
+ std::this_thread::sleep_for(std::chrono::milliseconds(5));
+ req.respond_with_content("text/plain", "hello");
+ } else {
+ auto result = fetch(f1.portal->listen_port(), null_crypto(), "/test");
+ EXPECT_EQUAL(result, make_expected_response("text/plain", "hello"));
+ }
+}
+
+TEST_MT_FFF("require that bind token destruction waits for active callbacks", 3,
+ LatchedFixture(), Gate(), TimeBomb(60))
+{
+ if (thread_id == 0) {
+ Portal::GetRequest req = f1.latch.read();
EXPECT_TRUE(!f2.await(20));
f1.exit_callback.countDown();
EXPECT_TRUE(f2.await(60000));
+ req.respond_with_content("application/json", "[1,2,3]");
+ } else if (thread_id == 1) {
+ f1.enter_callback.await();
+ f1.bound.reset();
+ f2.countDown();
+ } else {
+ auto result = fetch(f1.portal->listen_port(), null_crypto(), "/test");
+ EXPECT_EQUAL(result, make_expected_response("application/json", "[1,2,3]"));
+ }
+}
+
+TEST_MT_FFF("require that portal destruction waits for request completion", 3,
+ LatchedFixture(), Gate(), TimeBomb(60))
+{
+ if (thread_id == 0) {
+ Portal::GetRequest req = f1.latch.read();
+ f1.exit_callback.countDown();
+ EXPECT_TRUE(!f2.await(20));
+ req.respond_with_content("application/json", "[1,2,3]");
+ EXPECT_TRUE(f2.await(60000));
} else if (thread_id == 1) {
f1.enter_callback.await();
f1.bound.reset();
+ f1.portal.reset();
f2.countDown();
} else {
auto result = fetch(f1.portal->listen_port(), null_crypto(), "/test");
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 9ce28d62e18..fdf7524c82f 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -25,6 +25,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT
hdr_abort.cpp
host_name.cpp
joinable.cpp
+ latch.cpp
left_right_heap.cpp
lz4compressor.cpp
md5.c
@@ -42,12 +43,12 @@ vespa_add_library(vespalib_vespalib_util OBJECT
simple_thread_bundle.cpp
slaveproc.cpp
stash.cpp
- stringfmt.cpp
string_hash.cpp
- thread_bundle.cpp
+ stringfmt.cpp
thread.cpp
- threadstackexecutorbase.cpp
+ thread_bundle.cpp
threadstackexecutor.cpp
+ threadstackexecutorbase.cpp
time_tracker.cpp
valgrind.cpp
zstdcompressor.cpp
diff --git a/vespalib/src/vespa/vespalib/util/latch.cpp b/vespalib/src/vespa/vespalib/util/latch.cpp
new file mode 100644
index 00000000000..3c86b4d214e
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/latch.cpp
@@ -0,0 +1,7 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "latch.h"
+
+namespace vespalib {
+
+} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/latch.h b/vespalib/src/vespa/vespalib/util/latch.h
new file mode 100644
index 00000000000..73336111c0c
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/latch.h
@@ -0,0 +1,59 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <mutex>
+#include <condition_variable>
+#include <cassert>
+
+namespace vespalib {
+
+/**
+ * A latch acts like a blocking queue where the maximum capacity is a
+ * single element. It enables directional exchange of data where reads
+ * and writes are alternating.
+ **/
+template <typename T>
+class Latch {
+private:
+ std::mutex _lock;
+ std::condition_variable _cond;
+ char _space[sizeof(T)];
+ bool _has_value;
+
+ void *as_void() { return &_space[0]; }
+ T *as_value() { return (T*)as_void(); }
+public:
+ Latch() : _lock(), _cond(), _space(), _has_value(false) {}
+ ~Latch() {
+ if (_has_value) {
+ as_value()->~T();
+ }
+ }
+ bool has_value() {
+ std::lock_guard guard(_lock);
+ return _has_value;
+ }
+ T read() {
+ std::unique_lock guard(_lock);
+ while (!_has_value) {
+ _cond.wait(guard);
+ }
+ T value = std::move(*as_value());
+ as_value()->~T();
+ _has_value = false;
+ _cond.notify_all();
+ return value;
+ }
+ void write(T value) {
+ std::unique_lock guard(_lock);
+ while (_has_value) {
+ _cond.wait(guard);
+ }
+ new (as_void()) T(std::move(value));
+ _has_value = true;
+ _cond.notify_all();
+ }
+};
+
+} // namespace vespalib