diff options
author | Tor Brede Vekterli <vekterli@oath.com> | 2018-12-03 13:13:23 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-12-03 13:13:23 +0100 |
commit | 97cf964467aa3879efe1393ad4f49cffe60b3bf7 (patch) | |
tree | 4dcb26c3a083a9a330c0a3bf40b7d31fc82a96af | |
parent | 523f82a4a441ef60db6ef741bc0673c1e4214730 (diff) | |
parent | 9fcf6a80476e569a9ab2bf21f4615bb350c0f941 (diff) |
Merge pull request #7822 from vespa-engine/havardpe/latch
Havardpe/latch
-rw-r--r-- | fnet/src/tests/frt/rpc/invoke.cpp | 30 | ||||
-rw-r--r-- | vespalib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/tests/latch/CMakeLists.txt | 8 | ||||
-rw-r--r-- | vespalib/src/tests/latch/latch_test.cpp | 99 | ||||
-rw-r--r-- | vespalib/src/tests/portal/portal_test.cpp | 84 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/CMakeLists.txt | 7 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/latch.cpp | 7 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/latch.h | 59 |
8 files changed, 233 insertions, 62 deletions
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index dd08f365d58..80b17fba7db 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -2,6 +2,7 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/vespalib/net/socket_spec.h> #include <vespa/vespalib/util/benchmark_timer.h> +#include <vespa/vespalib/util/latch.h> #include <vespa/fnet/frt/frt.h> #include <mutex> #include <condition_variable> @@ -21,30 +22,13 @@ vespalib::CryptoEngine::SP crypto; class RequestLatch : public FRT_IRequestWait { private: - FRT_RPCRequest *_req; - std::mutex _lock; - std::condition_variable _cond; + vespalib::Latch<FRT_RPCRequest*> _latch; public: - RequestLatch() : _req(nullptr), _lock(), _cond() {} - ~RequestLatch() { ASSERT_TRUE(_req == nullptr); } - bool has_req() { - std::lock_guard guard(_lock); - return (_req != nullptr); - } - FRT_RPCRequest *read() { - std::unique_lock guard(_lock); - _cond.wait(guard, [&req = _req]{ return (req != nullptr); }); - auto ret = _req; - _req = nullptr; - _cond.notify_all(); - return ret; - } - void write(FRT_RPCRequest *req) { - std::unique_lock guard(_lock); - _cond.wait(guard, [&req = _req]{ return (req == nullptr); }); - _req = req; - _cond.notify_all(); - } + RequestLatch() : _latch() {} + ~RequestLatch() { ASSERT_TRUE(!has_req()); } + bool has_req() { return _latch.has_value(); } + FRT_RPCRequest *read() { return _latch.read(); } + void write(FRT_RPCRequest *req) { _latch.write(req); } void RequestDone(FRT_RPCRequest *req) override { write(req); } }; diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index 8bd3dda4e5a..fb60cc66931 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -50,6 +50,7 @@ vespa_define_module( src/tests/host_name src/tests/io/fileutil src/tests/io/mapped_file_input + src/tests/latch src/tests/left_right_heap src/tests/make_fixture_macros src/tests/memory diff --git a/vespalib/src/tests/latch/CMakeLists.txt b/vespalib/src/tests/latch/CMakeLists.txt new file mode 100644 index 00000000000..57a67d74868 --- /dev/null +++ b/vespalib/src/tests/latch/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_latch_test_app TEST + SOURCES + latch_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_latch_test_app COMMAND vespalib_latch_test_app) diff --git a/vespalib/src/tests/latch/latch_test.cpp b/vespalib/src/tests/latch/latch_test.cpp new file mode 100644 index 00000000000..f29b673e508 --- /dev/null +++ b/vespalib/src/tests/latch/latch_test.cpp @@ -0,0 +1,99 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/gate.h> +#include <vespa/vespalib/util/latch.h> + +using namespace vespalib; + +TEST("require that write then read works") { + Latch<int> latch; + EXPECT_TRUE(!latch.has_value()); + latch.write(42); + EXPECT_TRUE(latch.has_value()); + EXPECT_EQUAL(latch.read(), 42); + EXPECT_TRUE(!latch.has_value()); +} + +TEST_MT_FFF("require that read waits for write", 2, Latch<int>(), Gate(), TimeBomb(60)) { + if (thread_id == 0) { + EXPECT_TRUE(!f2.await(10)); + f1.write(123); + EXPECT_TRUE(f2.await(60000)); + } else { + EXPECT_EQUAL(f1.read(), 123); + f2.countDown(); + } +} + +TEST_MT_FFF("require that write waits for read", 2, Latch<int>(), Gate(), TimeBomb(60)) { + if (thread_id == 0) { + f1.write(123); + f1.write(456); + f2.countDown(); + } else { + EXPECT_TRUE(!f2.await(10)); + EXPECT_EQUAL(f1.read(), 123); + EXPECT_TRUE(f2.await(60000)); + EXPECT_EQUAL(f1.read(), 456); + } +} + +struct MyInt { + int value; + MyInt(int value_in) : value(value_in) {} + MyInt(MyInt &&rhs) = default; + MyInt(const MyInt &rhs) = delete; + MyInt &operator=(const MyInt &rhs) = delete; + MyInt &operator=(MyInt &&rhs) = delete; +}; + +TEST("require that un-assignable non-default-constructable move-only objects can be used") { + Latch<MyInt> latch; + latch.write(MyInt(1337)); + EXPECT_EQUAL(latch.read().value, 1337); +} + +struct MyObj { + static int total; + int *with_state; + MyObj(int &with_state_in) : with_state(&with_state_in) {} + MyObj(MyObj &&rhs) { + with_state = rhs.with_state; + rhs.with_state = nullptr; + } + void detach() { with_state = nullptr; } + ~MyObj() { + ++total; + if (with_state) { + ++(*with_state); + } + } + MyObj(const MyObj &rhs) = delete; + MyObj &operator=(const MyObj &rhs) = delete; + MyObj &operator=(MyObj &&rhs) = delete; +}; +int MyObj::total = 0; + +TEST("require that latched objects are appropriately destructed") { + int with_state = 0; + int total_sample = 0; + { + Latch<MyObj> latch1; + Latch<MyObj> latch2; + Latch<MyObj> latch3; + latch2.write(MyObj(with_state)); + latch3.write(MyObj(with_state)); + latch2.read().detach(); + EXPECT_TRUE(!latch1.has_value()); + EXPECT_TRUE(!latch2.has_value()); + EXPECT_TRUE(latch3.has_value()); + EXPECT_EQUAL(with_state, 0); + EXPECT_GREATER_EQUAL(MyObj::total, 1); + total_sample = MyObj::total; + } + EXPECT_EQUAL(MyObj::total, total_sample + 1); + EXPECT_EQUAL(with_state, 1); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/portal/portal_test.cpp b/vespalib/src/tests/portal/portal_test.cpp index cac4b78d3e2..a277914bef5 100644 --- a/vespalib/src/tests/portal/portal_test.cpp +++ b/vespalib/src/tests/portal/portal_test.cpp @@ -10,7 +10,7 @@ #include <vespa/vespalib/net/tls/tls_crypto_engine.h> #include <vespa/vespalib/net/tls/maybe_tls_crypto_engine.h> #include <vespa/vespalib/test/make_tls_options_for_testing.h> -#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/latch.h> using namespace vespalib; @@ -161,29 +161,6 @@ TEST("require that get requests dropped on the floor returns HTTP error") { EXPECT_EQUAL(result, expect); } -struct GetTask : public Executor::Task { - Portal::GetRequest request; - GetTask(Portal::GetRequest request_in) : request(std::move(request_in)) {} - void run() override { - request.respond_with_content("text/plain", "hello"); - } -}; - -TEST("require that GET requests can be completed in another thread") { - vespalib::string path = "/test"; - ThreadStackExecutor executor(1, 128 * 1024); - auto portal = Portal::create(null_crypto(), 0); - auto expect = make_expected_response("text/plain", "hello"); - MyGetHandler handler([&executor](Portal::GetRequest request) - { - executor.execute(std::make_unique<GetTask>(std::move(request))); - }); - auto bound = portal->bind(path, handler); - auto result = fetch(portal->listen_port(), null_crypto(), path); - EXPECT_EQUAL(result, expect); - executor.shutdown().sync(); -} - TEST("require that bogus request returns HTTP error") { auto portal = Portal::create(null_crypto(), 0); auto expect = make_expected_error(400, "Bad Request"); @@ -225,7 +202,7 @@ TEST("require that newer handlers with the same prefix shadows older ones") { EXPECT_EQUAL(fetch(portal->listen_port(), null_crypto(), "/foo"), make_expected_response("text/plain", "handler1")); } -TEST("require that connection errors do not block shutdown by leaking resources (also tests tight shutdown timing)") { +TEST("require that connection errors do not block shutdown by leaking resources") { MyGetHandler handler([](Portal::GetRequest request) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); @@ -257,35 +234,70 @@ TEST("require that connection errors do not block shutdown by leaking resources } } -struct WaitingFixture { +struct LatchedFixture { Portal::SP portal; - Gate enter_callback; - Gate exit_callback; MyGetHandler handler; Portal::Token::UP bound; - WaitingFixture() : portal(Portal::create(null_crypto(), 0)), - enter_callback(), - exit_callback(), + Gate enter_callback; + Latch<Portal::GetRequest> latch; + Gate exit_callback; + LatchedFixture() : portal(Portal::create(null_crypto(), 0)), handler([this](Portal::GetRequest request) { enter_callback.countDown(); - request.respond_with_content("application/json", "[1,2,3]"); + latch.write(std::move(request)); exit_callback.await(); }), - bound(portal->bind("/test", handler)) {} + bound(portal->bind("/test", handler)), + enter_callback(), latch(), exit_callback() {} }; -TEST_MT_FFF("require that bind token destruction waits for active requests", 3, - WaitingFixture(), Gate(), TimeBomb(60)) +TEST_MT_FF("require that GET requests can be completed in another thread", 2, + LatchedFixture(), TimeBomb(60)) { if (thread_id == 0) { - f1.enter_callback.await(); + Portal::GetRequest req = f1.latch.read(); + f1.exit_callback.countDown(); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + req.respond_with_content("text/plain", "hello"); + } else { + auto result = fetch(f1.portal->listen_port(), null_crypto(), "/test"); + EXPECT_EQUAL(result, make_expected_response("text/plain", "hello")); + } +} + +TEST_MT_FFF("require that bind token destruction waits for active callbacks", 3, + LatchedFixture(), Gate(), TimeBomb(60)) +{ + if (thread_id == 0) { + Portal::GetRequest req = f1.latch.read(); EXPECT_TRUE(!f2.await(20)); f1.exit_callback.countDown(); EXPECT_TRUE(f2.await(60000)); + req.respond_with_content("application/json", "[1,2,3]"); + } else if (thread_id == 1) { + f1.enter_callback.await(); + f1.bound.reset(); + f2.countDown(); + } else { + auto result = fetch(f1.portal->listen_port(), null_crypto(), "/test"); + EXPECT_EQUAL(result, make_expected_response("application/json", "[1,2,3]")); + } +} + +TEST_MT_FFF("require that portal destruction waits for request completion", 3, + LatchedFixture(), Gate(), TimeBomb(60)) +{ + if (thread_id == 0) { + Portal::GetRequest req = f1.latch.read(); + f1.exit_callback.countDown(); + EXPECT_TRUE(!f2.await(20)); + req.respond_with_content("application/json", "[1,2,3]"); + EXPECT_TRUE(f2.await(60000)); } else if (thread_id == 1) { f1.enter_callback.await(); f1.bound.reset(); + f1.portal.reset(); f2.countDown(); } else { auto result = fetch(f1.portal->listen_port(), null_crypto(), "/test"); diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 9ce28d62e18..fdf7524c82f 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -25,6 +25,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT hdr_abort.cpp host_name.cpp joinable.cpp + latch.cpp left_right_heap.cpp lz4compressor.cpp md5.c @@ -42,12 +43,12 @@ vespa_add_library(vespalib_vespalib_util OBJECT simple_thread_bundle.cpp slaveproc.cpp stash.cpp - stringfmt.cpp string_hash.cpp - thread_bundle.cpp + stringfmt.cpp thread.cpp - threadstackexecutorbase.cpp + thread_bundle.cpp threadstackexecutor.cpp + threadstackexecutorbase.cpp time_tracker.cpp valgrind.cpp zstdcompressor.cpp diff --git a/vespalib/src/vespa/vespalib/util/latch.cpp b/vespalib/src/vespa/vespalib/util/latch.cpp new file mode 100644 index 00000000000..3c86b4d214e --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/latch.cpp @@ -0,0 +1,7 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "latch.h" + +namespace vespalib { + +} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/latch.h b/vespalib/src/vespa/vespalib/util/latch.h new file mode 100644 index 00000000000..73336111c0c --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/latch.h @@ -0,0 +1,59 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <mutex> +#include <condition_variable> +#include <cassert> + +namespace vespalib { + +/** + * A latch acts like a blocking queue where the maximum capacity is a + * single element. It enables directional exchange of data where reads + * and writes are alternating. + **/ +template <typename T> +class Latch { +private: + std::mutex _lock; + std::condition_variable _cond; + char _space[sizeof(T)]; + bool _has_value; + + void *as_void() { return &_space[0]; } + T *as_value() { return (T*)as_void(); } +public: + Latch() : _lock(), _cond(), _space(), _has_value(false) {} + ~Latch() { + if (_has_value) { + as_value()->~T(); + } + } + bool has_value() { + std::lock_guard guard(_lock); + return _has_value; + } + T read() { + std::unique_lock guard(_lock); + while (!_has_value) { + _cond.wait(guard); + } + T value = std::move(*as_value()); + as_value()->~T(); + _has_value = false; + _cond.notify_all(); + return value; + } + void write(T value) { + std::unique_lock guard(_lock); + while (_has_value) { + _cond.wait(guard); + } + new (as_void()) T(std::move(value)); + _has_value = true; + _cond.notify_all(); + } +}; + +} // namespace vespalib |