diff options
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/.gitignore | 1 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/CMakeLists.txt | 7 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp | 136 | ||||
-rw-r--r-- | vespalib/src/tests/time_tracer/time_tracer_test.cpp | 27 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/time_tracer.cpp | 95 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/time_tracer.h | 54 |
6 files changed, 284 insertions, 36 deletions
diff --git a/fnet/src/tests/frt/parallel_rpc/.gitignore b/fnet/src/tests/frt/parallel_rpc/.gitignore index 7b4b7428e52..77560f68c5a 100644 --- a/fnet/src/tests/frt/parallel_rpc/.gitignore +++ b/fnet/src/tests/frt/parallel_rpc/.gitignore @@ -1 +1,2 @@ fnet_parallel_rpc_test_app +fnet_tls_rpc_bench_app diff --git a/fnet/src/tests/frt/parallel_rpc/CMakeLists.txt b/fnet/src/tests/frt/parallel_rpc/CMakeLists.txt index e7c29cfc9d5..6e9a7b4cd74 100644 --- a/fnet/src/tests/frt/parallel_rpc/CMakeLists.txt +++ b/fnet/src/tests/frt/parallel_rpc/CMakeLists.txt @@ -6,3 +6,10 @@ vespa_add_executable(fnet_parallel_rpc_test_app TEST fnet ) vespa_add_test(NAME fnet_parallel_rpc_test_app COMMAND fnet_parallel_rpc_test_app) +vespa_add_executable(fnet_tls_rpc_bench_app TEST + SOURCES + tls_rpc_bench.cpp + DEPENDS + fnet +) +vespa_add_test(NAME fnet_tls_rpc_bench_app COMMAND fnet_tls_rpc_bench_app BENCHMARK) diff --git a/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp new file mode 100644 index 00000000000..b3e613de93f --- /dev/null +++ b/fnet/src/tests/frt/parallel_rpc/tls_rpc_bench.cpp @@ -0,0 +1,136 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/util/benchmark_timer.h> +#include <vespa/vespalib/net/crypto_engine.h> +#include <vespa/vespalib/net/tls/tls_crypto_engine.h> +#include <vespa/vespalib/test/make_tls_options_for_testing.h> +#include <vespa/vespalib/test/time_tracer.h> +#include <vespa/fnet/frt/frt.h> +#include <thread> +#include <chrono> + +using namespace vespalib; + +using vespalib::test::TimeTracer; + +CryptoEngine::SP null_crypto = std::make_shared<NullCryptoEngine>(); +CryptoEngine::SP xor_crypto = std::make_shared<XorCryptoEngine>(); +CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing()); + +TT_Tag req_tag("request"); + +struct Fixture : FRT_Invokable { + FRT_Supervisor orb; + Fixture(CryptoEngine::SP crypto) : orb(std::move(crypto)) { + ASSERT_TRUE(orb.Listen(0)); + init_rpc(); + ASSERT_TRUE(orb.Start()); + } + FRT_Target *connect() { + return orb.GetTarget(orb.GetListenPort()); + } + ~Fixture() { + orb.ShutDown(true); + } + void init_rpc() { + FRT_ReflectionBuilder rb(&orb); + rb.DefineMethod("inc", "l", "l", FRT_METHOD(Fixture::rpc_inc), this); + rb.MethodDesc("increment a 64-bit integer"); + rb.ParamDesc("in", "an integer (64 bit)"); + rb.ReturnDesc("out", "in + 1 (64 bit)"); + } + void rpc_inc(FRT_RPCRequest *req) { + FRT_Values ¶ms = *req->GetParams(); + FRT_Values &ret = *req->GetReturn(); + ret.AddInt64(params[0]._intval64 + 1); + } +}; + +struct DurationCmp { + bool operator()(const TimeTracer::Record &a, const TimeTracer::Record &b) { + return ((a.stop - a.start) < (b.stop - b.start)); + } +}; + +struct StartCmp { + bool operator()(const TimeTracer::Record &a, const TimeTracer::Record &b) { + return (a.start < b.start); + } +}; + +void benchmark_rpc(Fixture &fixture, bool reconnect) { + uint64_t seq = 0; + FRT_Target *target = fixture.connect(); + FRT_RPCRequest *req = fixture.orb.AllocRPCRequest(); + auto invoke = [&seq, &target, &req, &fixture, reconnect](){ + TT_Sample sample(req_tag); + if (reconnect) { + target->SubRef(); + target = fixture.connect(); + } + req = fixture.orb.AllocRPCRequest(req); + req->SetMethodName("inc"); + req->GetParams()->AddInt64(seq); + target->InvokeSync(req, 60.0); + ASSERT_TRUE(req->CheckReturnTypes("l")); + uint64_t ret = req->GetReturn()->GetValue(0)._intval64; + EXPECT_EQUAL(ret, seq + 1); + seq = ret; + }; + auto before = TimeTracer::now(); + double t = BenchmarkTimer::benchmark(invoke, 5.0); + auto after = TimeTracer::now(); + target->SubRef(); + req->SubRef(); + auto stats = TimeTracer::extract().by_time(before, after).by_tag(req_tag.id()).get(); + ASSERT_TRUE(stats.size() > 0); + std::sort(stats.begin(), stats.end(), DurationCmp()); + auto med_sample = stats[stats.size() / 2]; + fprintf(stderr, "estimated min request latency: %g ms (reconnect = %s)\n", + (t * 1000.0), reconnect ? "yes":"no"); + fprintf(stderr, "actual median request latency: %g ms (reconnect = %s)\n", + med_sample.ms_duration(), reconnect ? "yes":"no"); + stats = TimeTracer::extract().by_time(med_sample.start, med_sample.stop).get(); + ASSERT_TRUE(stats.size() > 0); + std::sort(stats.begin(), stats.end(), StartCmp()); + fprintf(stderr, "===== time line BEGIN =====\n"); + for (const auto &entry: stats) { + double abs_start = std::chrono::duration<double, std::milli>(entry.start - med_sample.start).count(); + double abs_stop = std::chrono::duration<double, std::milli>(entry.stop - med_sample.start).count(); + fprintf(stderr, "[%g, %g] [%u:%s] %g ms\n", abs_start, abs_stop, entry.thread_id, entry.tag_name().c_str(), entry.ms_duration()); + } + fprintf(stderr, "===== time line END =====\n"); + std::sort(stats.begin(), stats.end(), DurationCmp()); + ASSERT_TRUE(stats.back().tag_id == req_tag.id()); + double rest_ms = stats.back().ms_duration(); + while (!stats.empty() && stats.back().ms_duration() > 1.0) { + const auto &entry = stats.back(); + if (entry.tag_id != req_tag.id()) { + fprintf(stderr, "WARNING: high duration: [%u:%s] %g ms\n", entry.thread_id, entry.tag_name().c_str(), entry.ms_duration()); + rest_ms -= entry.ms_duration(); + } + stats.pop_back(); + } + fprintf(stderr, "INFO: total non-critical overhead: %g ms\n", rest_ms); +} + +TEST_F("^^^-- rpc with null encryption", Fixture(null_crypto)) { + fprintf(stderr, "vvv-- rpc with null encryption\n"); + benchmark_rpc(f1, false); + benchmark_rpc(f1, true); +} + +TEST_F("^^^-- rpc with xor encryption", Fixture(xor_crypto)) { + fprintf(stderr, "vvv-- rpc with xor encryption\n"); + benchmark_rpc(f1, false); + benchmark_rpc(f1, true); +} + +TEST_F("^^^-- rpc with tls encryption", Fixture(tls_crypto)) { + fprintf(stderr, "vvv-- rpc with tls encryption\n"); + benchmark_rpc(f1, false); + benchmark_rpc(f1, true); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/time_tracer/time_tracer_test.cpp b/vespalib/src/tests/time_tracer/time_tracer_test.cpp index 835c3ffecbf..d7e2e0d579a 100644 --- a/vespalib/src/tests/time_tracer/time_tracer_test.cpp +++ b/vespalib/src/tests/time_tracer/time_tracer_test.cpp @@ -11,16 +11,15 @@ using vespalib::test::TimeTracer; TT_Tag tag0("tag0"); TT_Tag tag1("tag1"); TT_Tag tag2("tag2"); -TT_Tag tag1_too("tag1"); TT_Tag my_tag("my tag"); -TEST("require that tags are numbered by first creation time") { - EXPECT_EQUAL(tag0.id(), 0u); - EXPECT_EQUAL(tag1.id(), 1u); - EXPECT_EQUAL(tag2.id(), 2u); - EXPECT_EQUAL(tag1_too.id(), 1u); - EXPECT_EQUAL(my_tag.id(), 3u); +TEST("require that tag ids are equal if and only if tag names are equal") { + TT_Tag tag1_too("tag1"); + EXPECT_NOT_EQUAL(tag0.id(), tag1.id()); + EXPECT_NOT_EQUAL(tag1.id(), tag2.id()); + EXPECT_NOT_EQUAL(tag2.id(), tag0.id()); + EXPECT_EQUAL(tag1_too.id(), tag1.id()); } TEST_MT("require that threads are numbered by first sample", 3) { @@ -38,14 +37,14 @@ TEST_MT("require that threads are numbered by first sample", 3) { { TT_Sample sample(tag2); } } TEST_BARRIER(); // # 3 - auto list = TimeTracer::extract_all(); + auto list = TimeTracer::extract().get(); ASSERT_EQUAL(list.size(), 3u); EXPECT_EQUAL(list[0].thread_id, 0u); - EXPECT_EQUAL(list[0].tag_id, 0u); + EXPECT_EQUAL(list[0].tag_id, tag0.id()); EXPECT_EQUAL(list[1].thread_id, 1u); - EXPECT_EQUAL(list[1].tag_id, 1u); + EXPECT_EQUAL(list[1].tag_id, tag1.id()); EXPECT_EQUAL(list[2].thread_id, 2u); - EXPECT_EQUAL(list[2].tag_id, 2u); + EXPECT_EQUAL(list[2].tag_id, tag2.id()); } TEST("require that records are extracted inversely ordered by end time per thread") { @@ -53,11 +52,11 @@ TEST("require that records are extracted inversely ordered by end time per threa { TT_Sample s(my_tag); } { TT_Sample s(my_tag); } auto t = TimeTracer::now(); - auto list = TimeTracer::extract_all(); + auto list = TimeTracer::extract().get(); EXPECT_EQUAL(list.size(), 6u); size_t cnt = 0; for (const auto &item: list) { - if (item.tag_id == 3) { + if (item.tag_id == my_tag.id()) { ++cnt; EXPECT_TRUE(item.start <= item.stop); EXPECT_TRUE(item.stop <= t); @@ -73,7 +72,7 @@ TEST("benchmark time sampling") { fprintf(stderr, "min timestamp time: %g us\n", min_stamp_us); fprintf(stderr, "min sample time: %g us\n", min_sample_us); fprintf(stderr, "estimated non-clock overhead: %g us\n", (min_sample_us - (min_stamp_us * 2.0))); - auto list = TimeTracer::extract_all(); + auto list = TimeTracer::extract().get(); fprintf(stderr, "total samples after benchmarking: %zu\n", list.size()); EXPECT_GREATER(list.size(), 6u); } diff --git a/vespalib/src/vespa/vespalib/test/time_tracer.cpp b/vespalib/src/vespa/vespalib/test/time_tracer.cpp index 49f554d1ad2..b0801413871 100644 --- a/vespalib/src/vespa/vespalib/test/time_tracer.cpp +++ b/vespalib/src/vespa/vespalib/test/time_tracer.cpp @@ -28,6 +28,64 @@ TimeTracer::Tag::~Tag() = default; //----------------------------------------------------------------------------- +double +TimeTracer::Record::ms_duration() const +{ + return std::chrono::duration<double, std::milli>(stop - start).count(); +} + +vespalib::string +TimeTracer::Record::tag_name() const +{ + return master().get_tag_name(tag_id); +} + +//----------------------------------------------------------------------------- + +bool +TimeTracer::Extractor::keep(const Record &entry) const +{ + return ((!_by_thread || (entry.thread_id == _thread_id)) && + (!_by_tag || (entry.tag_id == _tag_id)) && + (!_by_time || (entry.stop > _a)) && + (!_by_time || (entry.start < _b))); +} + +TimeTracer::Extractor && +TimeTracer::Extractor::by_thread(uint32_t thread_id) && +{ + _by_thread = true; + _thread_id = thread_id; + return std::move(*this); +} + +TimeTracer::Extractor && +TimeTracer::Extractor::by_tag(uint32_t tag_id) && +{ + _by_tag = true; + _tag_id = tag_id; + return std::move(*this); +} + +TimeTracer::Extractor && +TimeTracer::Extractor::by_time(time_point a, time_point b) && +{ + _by_time = true; + _a = a; + _b = b; + return std::move(*this); +} + +std::vector<TimeTracer::Record> +TimeTracer::Extractor::get() const +{ + return master().extract_impl(*this); +} + +TimeTracer::Extractor::~Extractor() = default; + +//----------------------------------------------------------------------------- + void TimeTracer::init_thread_state() noexcept { @@ -39,7 +97,8 @@ TimeTracer::init_thread_state() noexcept TimeTracer::TimeTracer() : _lock(), _state_list(), - _tags() + _tags(), + _tag_names() { } @@ -53,26 +112,42 @@ TimeTracer::get_tag_id(const vespalib::string &tag_name) } uint32_t id = _tags.size(); _tags[tag_name] = id; + _tag_names.push_back(tag_name); return id; } +vespalib::string +TimeTracer::get_tag_name(uint32_t tag_id) +{ + std::lock_guard guard(_lock); + if (tag_id < _tag_names.size()) { + return _tag_names[tag_id]; + } else { + return "<undef>"; + } +} + TimeTracer::ThreadState * TimeTracer::create_thread_state() { std::lock_guard guard(_lock); - _state_list.push_back(std::make_unique<ThreadState>()); + uint32_t thread_id = _state_list.size(); + _state_list.push_back(std::make_unique<ThreadState>(thread_id)); return _state_list.back().get(); } std::vector<TimeTracer::Record> -TimeTracer::extract_all_impl() +TimeTracer::extract_impl(const Extractor &extractor) { std::lock_guard guard(_lock); std::vector<Record> list; - for (size_t thread_id = 0; thread_id < _state_list.size(); ++thread_id) { - const LogEntry *entry = _state_list[thread_id]->get_log_entries(); + for (const ThreadState::UP &state: _state_list) { + const LogEntry *entry = state->get_log_entries(); while (entry != nullptr) { - list.emplace_back(thread_id, entry->tag_id, entry->start, entry->stop); + Record record(state->thread_id(), entry->tag_id, entry->start, entry->stop); + if (extractor.keep(record)) { + list.push_back(record); + } entry = entry->next; } } @@ -83,12 +158,4 @@ TimeTracer::~TimeTracer() = default; //----------------------------------------------------------------------------- -std::vector<TimeTracer::Record> -TimeTracer::extract_all() -{ - return master().extract_all_impl(); -} - -//----------------------------------------------------------------------------- - } // namespace vespalib::test diff --git a/vespalib/src/vespa/vespalib/test/time_tracer.h b/vespalib/src/vespa/vespalib/test/time_tracer.h index 2a55e2134b3..c872ed84bbc 100644 --- a/vespalib/src/vespa/vespalib/test/time_tracer.h +++ b/vespalib/src/vespa/vespalib/test/time_tracer.h @@ -7,6 +7,7 @@ #include <chrono> #include <mutex> #include <atomic> +#include <thread> #include <map> namespace vespalib::test { @@ -82,6 +83,26 @@ public: time_point start_in, time_point stop_in) : thread_id(thread_id_in), tag_id(tag_id_in), start(start_in), stop(stop_in) {} + double ms_duration() const; + vespalib::string tag_name() const; + }; + + class Extractor { + private: + bool _by_thread; + uint32_t _thread_id; + bool _by_tag; + uint32_t _tag_id; + bool _by_time; + time_point _a; + time_point _b; + public: + bool keep(const Record &entry) const; + Extractor &&by_thread(uint32_t thread_id) &&; + Extractor &&by_tag(uint32_t tag_id) &&; + Extractor &&by_time(time_point a, time_point b) &&; + std::vector<Record> get() const; + ~Extractor(); }; private: @@ -93,20 +114,35 @@ private: LogEntry(uint32_t tag_id_in, time_point start_in, time_point stop_in, const LogEntry *next_in) : tag_id(tag_id_in), start(start_in), stop(stop_in), next(next_in) {} }; + + struct Guard { + std::atomic_flag &lock; + explicit Guard(std::atomic_flag &lock_in) noexcept : lock(lock_in) { + while (__builtin_expect(lock.test_and_set(std::memory_order_acquire), false)) { + std::this_thread::yield(); + } + } + ~Guard() noexcept { lock.clear(std::memory_order_release); } + }; + class ThreadState { private: + uint32_t _thread_id; + mutable std::atomic_flag _lock; vespalib::Stash _stash; - std::atomic<const LogEntry *> _list; + const LogEntry * _list; public: using UP = std::unique_ptr<ThreadState>; - ThreadState() : _stash(64 * 1024), _list(nullptr) {} + ThreadState(uint32_t thread_id) + : _thread_id(thread_id), _lock{ATOMIC_FLAG_INIT}, _stash(64 * 1024), _list(nullptr) {} + uint32_t thread_id() const { return _thread_id; } const LogEntry *get_log_entries() const { - return _list.load(std::memory_order_acquire); + Guard guard(_lock); + return _list; } void add_log_entry(uint32_t tag_id, time_point start, time_point stop) { - const LogEntry *old_list = _list.load(std::memory_order_relaxed); - _list.store(&_stash.create<LogEntry>(tag_id, start, stop, old_list), - std::memory_order_release); + Guard guard(_lock); + _list = &_stash.create<LogEntry>(tag_id, start, stop, _list); } }; static TimeTracer &master(); @@ -123,15 +159,17 @@ private: std::mutex _lock; std::vector<ThreadState::UP> _state_list; std::map<vespalib::string, uint32_t> _tags; + std::vector<vespalib::string> _tag_names; TimeTracer(); ~TimeTracer(); uint32_t get_tag_id(const vespalib::string &tag_name); + vespalib::string get_tag_name(uint32_t tag_id); ThreadState *create_thread_state(); - std::vector<Record> extract_all_impl(); + std::vector<Record> extract_impl(const Extractor &extractor); public: - static std::vector<Record> extract_all(); + static Extractor extract() { return Extractor(); } }; } // namespace vespalib::test |