From 76b389f2477812e9f37145f5e59cce70cc83df19 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Thu, 29 Aug 2019 13:28:40 +0000 Subject: add search protocol metrics --- .../proto_rpc_adapter/proto_rpc_adapter_test.cpp | 59 ++++++++++++++++++ .../src/vespa/searchlib/engine/CMakeLists.txt | 1 + .../vespa/searchlib/engine/proto_rpc_adapter.cpp | 50 ++++++++++----- .../src/vespa/searchlib/engine/proto_rpc_adapter.h | 5 ++ .../searchlib/engine/search_protocol_metrics.cpp | 55 +++++++++++++++++ .../searchlib/engine/search_protocol_metrics.h | 71 ++++++++++++++++++++++ 6 files changed, 226 insertions(+), 15 deletions(-) create mode 100644 searchlib/src/vespa/searchlib/engine/search_protocol_metrics.cpp create mode 100644 searchlib/src/vespa/searchlib/engine/search_protocol_metrics.h (limited to 'searchlib') diff --git a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp index 89763f54f3d..3dbe0d00881 100644 --- a/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp +++ b/searchlib/src/tests/engine/proto_rpc_adapter/proto_rpc_adapter_test.cpp @@ -9,6 +9,8 @@ #include #include #include +#include +#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Winline" @@ -25,6 +27,8 @@ using ProtoDocsumRequest = ProtoRpcAdapter::ProtoDocsumRequest; using ProtoDocsumReply = ProtoRpcAdapter::ProtoDocsumReply; using ProtoMonitorRequest = ProtoRpcAdapter::ProtoMonitorRequest; using ProtoMonitorReply = ProtoRpcAdapter::ProtoMonitorReply; +using QueryStats = SearchProtocolMetrics::QueryStats; +using DocsumStats = SearchProtocolMetrics::DocsumStats; struct MySearchServer : SearchServer { SearchReply::UP search(SearchRequest::Source src, SearchClient &client) override { @@ -32,6 +36,8 @@ struct MySearchServer : SearchServer { assert(req); auto reply = std::make_unique(); reply->totalHitCount = req->offset; // simplified search implementation + reply->request = std::move(req); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); client.searchDone(std::move(reply)); // simplified async response return std::unique_ptr(); } @@ -46,6 +52,8 @@ struct MyDocsumServer : DocsumServer { auto &list = reply->_root->setArray(); list.addObject().setBool("use_root_slime", req->useRootSlime()); list.addObject().setString("ranking", req->ranking); + reply->request = std::move(req); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); client.getDocsumsDone(std::move(reply)); // simplified async response return std::unique_ptr(); } @@ -80,6 +88,38 @@ struct ProtoRpcAdapterTest : ::testing::Test { //----------------------------------------------------------------------------- +TEST(QueryMetricTest, require_that_update_query_metrics_works_as_intended) { + SearchProtocolMetrics metrics; + QueryStats stats; + stats.latency = 0.25; + stats.request_size = 1000; + stats.reply_size = 500; + metrics.update_query_metrics(stats); + EXPECT_EQ(metrics.query().latency.getCount(), 1); + EXPECT_EQ(metrics.query().latency.getTotal(), 0.25); + EXPECT_EQ(metrics.query().request_size.getCount(), 1); + EXPECT_EQ(metrics.query().request_size.getTotal(), 1000); + EXPECT_EQ(metrics.query().reply_size.getCount(), 1); + EXPECT_EQ(metrics.query().reply_size.getTotal(), 500); +} + +TEST(DocsumMetricTest, require_that_update_docsum_metrics_works_as_intended) { + SearchProtocolMetrics metrics; + DocsumStats stats; + stats.latency = 0.25; + stats.request_size = 1000; + stats.reply_size = 500; + stats.requested_documents = 10; + metrics.update_docsum_metrics(stats); + EXPECT_EQ(metrics.docsum().latency.getCount(), 1); + EXPECT_EQ(metrics.docsum().latency.getTotal(), 0.25); + EXPECT_EQ(metrics.docsum().request_size.getCount(), 1); + EXPECT_EQ(metrics.docsum().request_size.getTotal(), 1000); + EXPECT_EQ(metrics.docsum().reply_size.getCount(), 1); + EXPECT_EQ(metrics.docsum().reply_size.getTotal(), 500); + EXPECT_EQ(metrics.docsum().requested_documents.getValue(), 10); +} + TEST_F(ProtoRpcAdapterTest, require_that_plain_rpc_ping_works) { auto target = connect(); auto *req = new FRT_RPCRequest(); @@ -110,6 +150,12 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_search_works) { rpc->SubRef(); } target->SubRef(); + SearchProtocolMetrics &metrics = adapter.metrics(); + EXPECT_EQ(metrics.query().latency.getCount(), 2); + EXPECT_GT(metrics.query().latency.getTotal(), 0.0); + EXPECT_GT(metrics.query().request_size.getTotal(), 0); + EXPECT_GT(metrics.query().reply_size.getTotal(), 0); + EXPECT_EQ(metrics.docsum().latency.getCount(), 0); } TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_getDocsums_works) { @@ -118,6 +164,9 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_getDocsums_works) { auto *rpc = new FRT_RPCRequest(); ProtoDocsumRequest req; req.set_rank_profile("mlr"); + req.add_global_ids("foo"); + req.add_global_ids("bar"); + req.add_global_ids("baz"); ProtoRpcAdapter::encode_docsum_request(req, *rpc); target->InvokeSync(rpc, 60.0); if (online) { @@ -136,6 +185,13 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_getDocsums_works) { rpc->SubRef(); } target->SubRef(); + SearchProtocolMetrics &metrics = adapter.metrics(); + EXPECT_EQ(metrics.query().latency.getCount(), 0); + EXPECT_EQ(metrics.docsum().latency.getCount(), 2); + EXPECT_GT(metrics.docsum().latency.getTotal(), 0.0); + EXPECT_GT(metrics.docsum().request_size.getTotal(), 0); + EXPECT_GT(metrics.docsum().reply_size.getTotal(), 0); + EXPECT_EQ(metrics.docsum().requested_documents.getValue(), 6); } TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_ping_works) { @@ -157,6 +213,9 @@ TEST_F(ProtoRpcAdapterTest, require_that_proto_rpc_ping_works) { rpc->SubRef(); } target->SubRef(); + SearchProtocolMetrics &metrics = adapter.metrics(); + EXPECT_EQ(metrics.query().latency.getCount(), 0); + EXPECT_EQ(metrics.docsum().latency.getCount(), 0); } //----------------------------------------------------------------------------- diff --git a/searchlib/src/vespa/searchlib/engine/CMakeLists.txt b/searchlib/src/vespa/searchlib/engine/CMakeLists.txt index 0dc183289c0..36ff4e12108 100644 --- a/searchlib/src/vespa/searchlib/engine/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/engine/CMakeLists.txt @@ -23,6 +23,7 @@ vespa_add_library(searchlib_engine OBJECT proto_converter.cpp proto_rpc_adapter.cpp request.cpp + search_protocol_metrics.cpp searchreply.cpp searchrequest.cpp trace.cpp diff --git a/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.cpp b/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.cpp index fabfa638c33..389a9ad3e2a 100644 --- a/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.cpp +++ b/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.cpp @@ -25,6 +25,8 @@ using ProtoDocsumRequest = ProtoConverter::ProtoDocsumRequest; using ProtoDocsumReply = ProtoConverter::ProtoDocsumReply; using ProtoMonitorRequest = ProtoConverter::ProtoMonitorRequest; using ProtoMonitorReply = ProtoConverter::ProtoMonitorReply; +using QueryStats = SearchProtocolMetrics::QueryStats; +using DocsumStats = SearchProtocolMetrics::DocsumStats; namespace { @@ -79,11 +81,13 @@ bool decode_message(const FRT_Values &src, MSG &dst) { struct SearchRequestDecoder : SearchRequest::Source::Decoder { FRT_RPCRequest &rpc; // valid until Return is called + QueryStats &stats; RelativeTime relative_time; - SearchRequestDecoder(FRT_RPCRequest &rpc_in) - : rpc(rpc_in), relative_time(std::make_unique()) {} + SearchRequestDecoder(FRT_RPCRequest &rpc_in, QueryStats &stats_in) + : rpc(rpc_in), stats(stats_in), relative_time(std::make_unique()) {} std::unique_ptr decode() override { ProtoSearchRequest msg; + stats.request_size = (*rpc.GetParams())[2]._data._len; if (!decode_message(*rpc.GetParams(), msg)) { LOG(warning, "got bad protobuf search request over rpc (unable to decode)"); return std::unique_ptr(nullptr); @@ -94,18 +98,24 @@ struct SearchRequestDecoder : SearchRequest::Source::Decoder { } }; -std::unique_ptr search_request_decoder(FRT_RPCRequest &rpc) { - return std::make_unique(rpc); +std::unique_ptr search_request_decoder(FRT_RPCRequest &rpc, QueryStats &stats) { + return std::make_unique(rpc, stats); } // allocated in the stash of the request it is completing; no self-delete needed struct SearchCompletionHandler : SearchClient { FRT_RPCRequest &req; - SearchCompletionHandler(FRT_RPCRequest &req_in) : req(req_in) {} + SearchProtocolMetrics &metrics; + QueryStats stats; + SearchCompletionHandler(FRT_RPCRequest &req_in, SearchProtocolMetrics &metrics_in) + : req(req_in), metrics(metrics_in), stats() {} void searchDone(SearchReply::UP reply) override { ProtoSearchReply msg; ProtoConverter::search_reply_to_proto(*reply, msg); encode_search_reply(msg, *req.GetReturn()); + stats.reply_size = (*req.GetReturn())[2]._data._len; + stats.latency = reply->request->getTimeUsed().sec(); + metrics.update_query_metrics(stats); req.Return(); } }; @@ -114,33 +124,42 @@ struct SearchCompletionHandler : SearchClient { struct DocsumRequestDecoder : DocsumRequest::Source::Decoder { FRT_RPCRequest &rpc; // valid until Return is called + DocsumStats &stats; RelativeTime relative_time; - DocsumRequestDecoder(FRT_RPCRequest &rpc_in) - : rpc(rpc_in), relative_time(std::make_unique()) {} + DocsumRequestDecoder(FRT_RPCRequest &rpc_in, DocsumStats &stats_in) + : rpc(rpc_in), stats(stats_in), relative_time(std::make_unique()) {} std::unique_ptr decode() override { ProtoDocsumRequest msg; + stats.request_size = (*rpc.GetParams())[2]._data._len; if (!decode_message(*rpc.GetParams(), msg)) { LOG(warning, "got bad protobuf docsum request over rpc (unable to decode)"); return std::unique_ptr(nullptr); } + stats.requested_documents = msg.global_ids_size(); auto req = std::make_unique(std::move(relative_time), true); ProtoConverter::docsum_request_from_proto(msg, *req); return req; } }; -std::unique_ptr docsum_request_decoder(FRT_RPCRequest &rpc) { - return std::make_unique(rpc); +std::unique_ptr docsum_request_decoder(FRT_RPCRequest &rpc, DocsumStats &stats) { + return std::make_unique(rpc, stats); } // allocated in the stash of the request it is completing; no self-delete needed struct GetDocsumsCompletionHandler : DocsumClient { FRT_RPCRequest &req; - GetDocsumsCompletionHandler(FRT_RPCRequest &req_in) : req(req_in) {} + SearchProtocolMetrics &metrics; + DocsumStats stats; + GetDocsumsCompletionHandler(FRT_RPCRequest &req_in, SearchProtocolMetrics &metrics_in) + : req(req_in), metrics(metrics_in), stats() {} void getDocsumsDone(DocsumReply::UP reply) override { ProtoDocsumReply msg; ProtoConverter::docsum_reply_to_proto(*reply, msg); encode_message(msg, *req.GetReturn()); + stats.reply_size = (*req.GetReturn())[2]._data._len; + stats.latency = reply->request->getTimeUsed().sec(); + metrics.update_docsum_metrics(stats); req.Return(); } }; @@ -179,7 +198,8 @@ ProtoRpcAdapter::ProtoRpcAdapter(SearchServer &search_server, : _search_server(search_server), _docsum_server(docsum_server), _monitor_server(monitor_server), - _online(false) + _online(false), + _metrics() { FRT_ReflectionBuilder rb(&orb); //------------------------------------------------------------------------- @@ -207,8 +227,8 @@ ProtoRpcAdapter::rpc_search(FRT_RPCRequest *req) return req->SetError(FRTE_RPC_METHOD_FAILED, "Server not online"); } req->Detach(); - auto &client = req->getStash().create(*req); - auto reply = _search_server.search(search_request_decoder(*req), client); + auto &client = req->getStash().create(*req, _metrics); + auto reply = _search_server.search(search_request_decoder(*req, client.stats), client); if (reply) { client.searchDone(std::move(reply)); } @@ -221,8 +241,8 @@ ProtoRpcAdapter::rpc_getDocsums(FRT_RPCRequest *req) return req->SetError(FRTE_RPC_METHOD_FAILED, "Server not online"); } req->Detach(); - auto &client = req->getStash().create(*req); - auto reply = _docsum_server.getDocsums(docsum_request_decoder(*req), client); + auto &client = req->getStash().create(*req, _metrics); + auto reply = _docsum_server.getDocsums(docsum_request_decoder(*req, client.stats), client); if (reply) { client.getDocsumsDone(std::move(reply)); } diff --git a/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.h b/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.h index 2ddfad0819c..ba542423bd8 100644 --- a/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.h +++ b/searchlib/src/vespa/searchlib/engine/proto_rpc_adapter.h @@ -6,6 +6,8 @@ #include "proto_converter.h" #include +#include "search_protocol_metrics.h" + class FRT_Supervisor; namespace search::engine { @@ -33,12 +35,15 @@ private: DocsumServer &_docsum_server; MonitorServer &_monitor_server; std::atomic _online; + SearchProtocolMetrics _metrics; public: ProtoRpcAdapter(SearchServer &search_server, DocsumServer &docsum_server, MonitorServer &monitor_server, FRT_Supervisor &orb); + SearchProtocolMetrics &metrics() { return _metrics; } + void set_online() { _online.store(true, std::memory_order_release); } bool is_online() const { return _online.load(std::memory_order_acquire); } diff --git a/searchlib/src/vespa/searchlib/engine/search_protocol_metrics.cpp b/searchlib/src/vespa/searchlib/engine/search_protocol_metrics.cpp new file mode 100644 index 00000000000..fe9fddeffd7 --- /dev/null +++ b/searchlib/src/vespa/searchlib/engine/search_protocol_metrics.cpp @@ -0,0 +1,55 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "search_protocol_metrics.h" + +namespace search::engine { + +SearchProtocolMetrics::QueryMetrics::QueryMetrics(metrics::MetricSet *parent) + : metrics::MetricSet("query", {}, "Query metrics", parent), + latency("latency", {{"logdefault"}}, "Query request latency (seconds)", this), + request_size("request_size", {{"logdefault"}}, "Query request size (network bytes)", this), + reply_size("reply_size", {{"logdefault"}}, "Query reply size (network bytes)", this) +{ +} +SearchProtocolMetrics::QueryMetrics::~QueryMetrics() = default; + +SearchProtocolMetrics::DocsumMetrics::DocsumMetrics(metrics::MetricSet *parent) + : metrics::MetricSet("docsum", {}, "Docsum metrics", parent), + latency("latency", {{"logdefault"}}, "Docsum request latency (seconds)", this), + request_size("request_size", {{"logdefault"}}, "Docsum request size (network bytes)", this), + reply_size("reply_size", {{"logdefault"}}, "Docsum reply size (network bytes)", this), + requested_documents("requested_documents", {{"logdefault"}}, "Total requested document summaries", this) +{ +} +SearchProtocolMetrics::DocsumMetrics::~DocsumMetrics() = default; + +SearchProtocolMetrics::SearchProtocolMetrics() + : metrics::MetricSet("search_protocol", {}, "Search protocol server metrics", nullptr), + _lock(), + _query(this), + _docsum(this) +{ +} + +SearchProtocolMetrics::~SearchProtocolMetrics() = default; + +void +SearchProtocolMetrics::update_query_metrics(const QueryStats &stats) +{ + auto guard = std::lock_guard(_lock); + _query.latency.set(stats.latency); + _query.request_size.set(stats.request_size); + _query.reply_size.set(stats.reply_size); +} + +void +SearchProtocolMetrics::update_docsum_metrics(const DocsumStats &stats) +{ + auto guard = std::lock_guard(_lock); + _docsum.latency.set(stats.latency); + _docsum.request_size.set(stats.request_size); + _docsum.reply_size.set(stats.reply_size); + _docsum.requested_documents.inc(stats.requested_documents); +} + +} diff --git a/searchlib/src/vespa/searchlib/engine/search_protocol_metrics.h b/searchlib/src/vespa/searchlib/engine/search_protocol_metrics.h new file mode 100644 index 00000000000..378a045e32b --- /dev/null +++ b/searchlib/src/vespa/searchlib/engine/search_protocol_metrics.h @@ -0,0 +1,71 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include +#include + +namespace search::engine { + +/** + * Metrics for the proto rpc adapter component implementing the search + * protocol in proton. + **/ +class SearchProtocolMetrics : public metrics::MetricSet +{ +public: + // sub-metrics for query request/reply + struct QueryMetrics : metrics::MetricSet { + metrics::DoubleAverageMetric latency; + metrics::LongAverageMetric request_size; + metrics::LongAverageMetric reply_size; + + QueryMetrics(metrics::MetricSet *parent); + ~QueryMetrics() override; + }; + + // value-wrapper used when collecting and reporting query metrics + struct QueryStats { + double latency; + size_t request_size; + size_t reply_size; + QueryStats() : latency(0.0), request_size(0), reply_size(0) {} + }; + + // sub-metrics for docsum request/reply + struct DocsumMetrics : metrics::MetricSet { + metrics::DoubleAverageMetric latency; + metrics::LongAverageMetric request_size; + metrics::LongAverageMetric reply_size; + metrics::LongCountMetric requested_documents; + + DocsumMetrics(metrics::MetricSet *parent); + ~DocsumMetrics() override; + }; + + // value-wrapper used when collecting and reporting docsum metrics + struct DocsumStats { + double latency; + size_t request_size; + size_t reply_size; + size_t requested_documents; + DocsumStats() : latency(0.0), request_size(0), reply_size(0), requested_documents(0) {} + }; + +private: + std::mutex _lock; + QueryMetrics _query; + DocsumMetrics _docsum; + +public: + SearchProtocolMetrics(); + ~SearchProtocolMetrics() override; + + const QueryMetrics &query() const { return _query; } + const DocsumMetrics &docsum() const { return _docsum; } + + void update_query_metrics(const QueryStats &stats); + void update_docsum_metrics(const DocsumStats &stats); +}; + +} -- cgit v1.2.3