diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-04-21 22:36:24 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-04-21 22:36:24 +0200 |
commit | 8545327c3b81679860eec19d8ac5ab5fec953734 (patch) | |
tree | 570c0a87eb8491dc6921982dc0e3c30053b528db | |
parent | f1c7a5bdb741586598b4ea1b964aff407e1a3065 (diff) | |
parent | 5f241b23e800b3c9ec6c2d195649a9f6a7c63b88 (diff) |
Merge pull request #5649 from vespa-engine/balder/transport-independent-docsum-metrics
Balder/transport independent docsum metrics
8 files changed, 107 insertions, 56 deletions
diff --git a/searchcore/src/tests/proton/summaryengine/summaryengine.cpp b/searchcore/src/tests/proton/summaryengine/summaryengine.cpp index 408ca27c16e..8206eba6350 100644 --- a/searchcore/src/tests/proton/summaryengine/summaryengine.cpp +++ b/searchcore/src/tests/proton/summaryengine/summaryengine.cpp @@ -8,9 +8,12 @@ #include <vespa/vespalib/data/databuffer.h> #include <vespa/vespalib/util/compressor.h> #include <vespa/searchlib/common/transport.h> +#include <vespa/metrics/metricset.h> #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/log/log.h> +#include <vespa/metrics/metrics.h> + LOG_SETUP("summaryengine_test"); using namespace search::engine; @@ -204,6 +207,9 @@ TEST("requireThatCorrectHandlerIsUsed") { EXPECT_TRUE(assertDocsumReply(engine, "bar", "bar reply")); EXPECT_TRUE(assertDocsumReply(engine, "baz", "baz reply")); EXPECT_TRUE(assertDocsumReply(engine, "not", "bar reply")); // uses the first (sorted on name) + EXPECT_EQUAL(4ul, static_cast<metrics::LongCountMetric *>(engine.getMetrics().getMetric("count"))->getValue()); + EXPECT_EQUAL(4ul, static_cast<metrics::LongCountMetric *>(engine.getMetrics().getMetric("docs"))->getValue()); + EXPECT_LESS(0.0, static_cast<metrics::DoubleAverageMetric *>(engine.getMetrics().getMetric("latency"))->getAverage()); } using vespalib::Slime; diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 492618b0472..32dc711d5cf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -13,20 +13,26 @@ #include "searchhandlerproxy.h" #include "simpleflush.h" -#include <vespa/document/base/exceptions.h> -#include <vespa/document/datatype/documenttype.h> -#include <vespa/document/repo/documenttyperepo.h> #include <vespa/searchcommon/common/schemaconfigurer.h> +#include <vespa/searchcore/proton/flushengine/flushengine.h> #include <vespa/searchcore/proton/flushengine/flush_engine_explorer.h> #include <vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.h> #include <vespa/searchcore/proton/flushengine/tls_stats_factory.h> #include <vespa/searchcore/proton/reference/document_db_reference_registry.h> +#include <vespa/searchcore/proton/summaryengine/summaryengine.h> +#include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h> +#include <vespa/searchcore/proton/matchengine/matchengine.h> #include <vespa/searchlib/transactionlog/trans_log_server_explorer.h> #include <vespa/searchlib/util/fileheadertk.h> +#include <vespa/document/base/exceptions.h> +#include <vespa/document/datatype/documenttype.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/host_name.h> #include <vespa/vespalib/util/random.h> +#include <vespa/searchlib/engine/transportserver.h> +#include <vespa/vespalib/net/state_server.h> #include <vespa/searchlib/aggregation/forcelink.hpp> #include <vespa/searchlib/expression/forcelink.hpp> @@ -130,10 +136,8 @@ Proton::ProtonFileHeaderContext::addTags(vespalib::GenericHeader &header, void -Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string & - clusterName, - const vespalib::string & - baseDir) +Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string & clusterName, + const vespalib::string & baseDir) { if (!clusterName.empty()) { _cluster = clusterName; @@ -243,12 +247,12 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) _tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext); _metricsEngine->addMetricsHook(_metricsHook); _fileHeaderContext.setClusterName(protonConfig.clustername, protonConfig.basedir); - _matchEngine.reset(new MatchEngine(protonConfig.numsearcherthreads, - protonConfig.numthreadspersearch, - protonConfig.distributionkey)); + _matchEngine = std::make_unique<MatchEngine>(protonConfig.numsearcherthreads, + protonConfig.numthreadspersearch, + protonConfig.distributionkey); _distributionKey = protonConfig.distributionkey; - _summaryEngine.reset(new SummaryEngine(protonConfig.numsummarythreads)); - _docsumBySlime.reset(new DocsumBySlime(*_summaryEngine)); + _summaryEngine= std::make_unique<SummaryEngine>(protonConfig.numsummarythreads); + _docsumBySlime = std::make_unique<DocsumBySlime>(*_summaryEngine); IFlushStrategy::SP strategy; const ProtonConfig::Flush & flush(protonConfig.flush); switch (flush.strategy) { @@ -262,17 +266,18 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) } case ProtonConfig::Flush::SIMPLE: default: - strategy.reset(new SimpleFlush()); + strategy = std::make_shared<SimpleFlush>(); break; } vespalib::mkdir(protonConfig.basedir + "/documents", true); vespalib::chdir(protonConfig.basedir); _tls->start(); - _flushEngine.reset(new FlushEngine(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()), - strategy, flush.maxconcurrent, flush.idleinterval*1000)); - _fs4Server.reset(new TransportServer(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL)); + _flushEngine = std::make_unique<FlushEngine>(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()), + strategy, flush.maxconcurrent, flush.idleinterval*1000); + _fs4Server = std::make_unique<TransportServer>(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL); _fs4Server->setTCPNoDelay(true); _metricsEngine->addExternalMetrics(_fs4Server->getMetrics()); + _metricsEngine->addExternalMetrics(_summaryEngine->getMetrics()); char tmp[1024]; LOG(debug, "Start proton server with root at %s and cwd at %s", @@ -305,7 +310,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) waitForInitDone(); _metricsEngine->start(_configUri); - _stateServer.reset(new vespalib::StateServer(protonConfig.httpport, _healthAdapter, _metricsEngine->metrics_producer(), *this)); + _stateServer = std::make_unique<vespalib::StateServer>(protonConfig.httpport, _healthAdapter, + _metricsEngine->metrics_producer(), *this); _customComponentBindToken = _stateServer->repo().bind(CUSTOM_COMPONENT_API_PATH, _genericStateHandler); _customComponentRootToken = _stateServer->repo().add_root_resource(CUSTOM_COMPONENT_API_PATH); @@ -399,6 +405,9 @@ Proton::~Proton() if (_matchEngine) { _matchEngine->close(); } + if (_metricsEngine && _summaryEngine) { + _metricsEngine->removeExternalMetrics(_summaryEngine->getMetrics()); + } if (_summaryEngine) { _summaryEngine->close(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 6dfc6c429a3..dd2b42d94cf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -12,33 +12,36 @@ #include "proton_config_fetcher.h" #include "proton_configurer.h" #include "rpc_hooks.h" -#include <vespa/searchcore/proton/flushengine/flushengine.h> -#include <vespa/searchcore/proton/matchengine/matchengine.h> #include <vespa/searchcore/proton/matching/querylimiter.h> #include <vespa/searchcore/proton/metrics/metrics_engine.h> #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> -#include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h> -#include <vespa/searchcore/proton/summaryengine/summaryengine.h> #include <vespa/searchlib/common/fileheadercontext.h> #include <vespa/searchlib/engine/monitorapi.h> -#include <vespa/searchlib/engine/transportserver.h> #include <vespa/searchlib/transactionlog/translogserverapp.h> #include <vespa/vespalib/net/component_config_producer.h> #include <vespa/vespalib/net/generic_state_handler.h> #include <vespa/vespalib/net/json_get_handler.h> +#include <vespa/vespalib/net/json_handler_repo.h> #include <vespa/vespalib/net/state_explorer.h> -#include <vespa/vespalib/net/state_server.h> #include <vespa/vespalib/util/varholder.h> #include <mutex> #include <shared_mutex> +namespace search::engine { class TransportServer; } + +namespace vespalib { class StateServer; } + namespace proton { class DiskMemUsageSampler; class IDocumentDBReferenceRegistry; class PrepareRestartHandler; +class SummaryEngine; +class DocsumBySlime; +class FlushEngine; +class MatchEngine; class Proton : public IProtonConfigurerOwner, public search::engine::MonitorServer, @@ -50,7 +53,7 @@ class Proton : public IProtonConfigurerOwner, { private: typedef search::transactionlog::TransLogServerApp TLS; - typedef search::engine::TransportServer TransportServer; + using TransportServer = search::engine::TransportServer; typedef search::engine::MonitorRequest MonitorRequest; typedef search::engine::MonitorReply MonitorReply; typedef search::engine::MonitorClient MonitorClient; @@ -88,25 +91,25 @@ private: const config::ConfigUri _configUri; mutable std::shared_timed_mutex _mutex; MetricsUpdateHook _metricsHook; - MetricsEngine::UP _metricsEngine; + std::unique_ptr<MetricsEngine> _metricsEngine; ProtonFileHeaderContext _fileHeaderContext; TLS::UP _tls; std::unique_ptr<DiskMemUsageSampler> _diskMemUsageSampler; PersistenceEngine::UP _persistenceEngine; DocumentDBMap _documentDBMap; - MatchEngine::UP _matchEngine; - SummaryEngine::UP _summaryEngine; - DocsumBySlime::UP _docsumBySlime; + std::unique_ptr<MatchEngine> _matchEngine; + std::unique_ptr<SummaryEngine> _summaryEngine; + std::unique_ptr<DocsumBySlime> _docsumBySlime; MemoryFlushConfigUpdater::UP _memoryFlushConfigUpdater; - FlushEngine::UP _flushEngine; + std::unique_ptr<FlushEngine> _flushEngine; std::unique_ptr<PrepareRestartHandler> _prepareRestartHandler; RPCHooks::UP _rpcHooks; HealthAdapter _healthAdapter; vespalib::GenericStateHandler _genericStateHandler; vespalib::JsonHandlerRepo::Token::UP _customComponentBindToken; vespalib::JsonHandlerRepo::Token::UP _customComponentRootToken; - vespalib::StateServer::UP _stateServer; - TransportServer::UP _fs4Server; + std::unique_ptr<vespalib::StateServer> _stateServer; + std::unique_ptr<TransportServer> _fs4Server; vespalib::ThreadStackExecutor _executor; ProtonConfigurer _protonConfigurer; ProtonConfigFetcher _protonConfigFetcher; diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp index b80e4ff87ea..ab012760762 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp @@ -2,6 +2,8 @@ #include "rpc_hooks.h" #include "proton.h" +#include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h> +#include <vespa/searchcore/proton/matchengine/matchengine.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/fnet/frt/supervisor.h> @@ -24,7 +26,7 @@ struct Pair { ~Pair(); }; -Pair::~Pair() {} +Pair::~Pair() = default; } @@ -46,7 +48,7 @@ RPCHooksBase::checkState(StateArg::UP arg) Session & session = *arg->_session; Executor::Task::UP failedTask = _executor.execute(makeTask( makeClosure(this, &RPCHooksBase::checkState, std::move(arg)))); - if (failedTask.get() != NULL) { + if (failedTask) { reportState(session, req); req->Return(); } @@ -273,7 +275,7 @@ RPCHooksBase::rpc_GetState(FRT_RPCRequest *req) } else { fastos::TimeStamp dueTime(fastos::ClockSystem::now() + timeoutMS * fastos::TimeStamp::MS); StateArg::UP stateArg(new StateArg(sharedSession, req, dueTime)); - if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg)))).get() != NULL) { + if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg))))) { reportState(*sharedSession, req); req->Return(); } else { @@ -342,7 +344,7 @@ RPCHooksBase::rpc_getIncrementalState(FRT_RPCRequest *req) } else { fastos::TimeStamp dueTime(fastos::ClockSystem::now() + timeoutMS * fastos::TimeStamp::MS); StateArg::UP stateArg(new StateArg(sharedSession, req, dueTime)); - if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg)))).get() != NULL) { + if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg))))) { reportState(*sharedSession, req); req->Return(); } else { diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp index 30e9382ae14..498c8b914db 100644 --- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp @@ -32,14 +32,23 @@ public: namespace proton { +SummaryEngine::DocsumMetrics::DocsumMetrics() + : metrics::MetricSet("docsum", "", "Docsum metrics", nullptr), + count("count", "logdefault", "Docsum requests handled", this), + docs("docs", "logdefault", "Total docsums returned", this), + latency("latency", "logdefault", "Docsum request latency", this) +{ +} + +SummaryEngine::DocsumMetrics::~DocsumMetrics() = default; + SummaryEngine::SummaryEngine(size_t numThreads) : _lock(), _closed(false), _handlers(), - _executor(numThreads, 128 * 1024) -{ - // empty -} + _executor(numThreads, 128 * 1024), + _metrics(std::make_unique<DocsumMetrics>()) +{ } SummaryEngine::~SummaryEngine() { @@ -114,10 +123,21 @@ SummaryEngine::getDocsums(DocsumRequest::UP req) reply = snapshot->get()->getDocsums(*req); // use the first handler } } + updateDocsumMetrics(req->getTimeUsed().sec(), reply->docsums.size()); } reply->request = std::move(req); + return reply; } +void +SummaryEngine::updateDocsumMetrics(double latency_s, uint32_t numDocs) +{ + std::lock_guard guard(_lock); + DocsumMetrics & m = static_cast<DocsumMetrics &>(*_metrics); + m.count.inc(); + m.docs.inc(numDocs); + m.latency.set(latency_s); +} } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h index 2420a656909..029aefacfc8 100644 --- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h +++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h @@ -1,33 +1,43 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "isearchhandler.h" +#include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/searchcore/proton/common/handlermap.hpp> -#include <vespa/searchcore/proton/summaryengine/isearchhandler.h> #include <vespa/searchlib/engine/docsumapi.h> #include <vespa/vespalib/util/threadstackexecutor.h> -#include <vespa/searchcore/proton/common/doctypename.h> +#include <vespa/metrics/valuemetric.h> +#include <vespa/metrics/countmetric.h> +#include <vespa/metrics/metricset.h> #include <mutex> + namespace proton { class SummaryEngine : public search::engine::DocsumServer { private: + void updateDocsumMetrics(double latency_s, uint32_t numDocs); using DocsumReply = search::engine::DocsumReply; using DocsumRequest = search::engine::DocsumRequest; using DocsumClient = search::engine::DocsumClient; + struct DocsumMetrics : metrics::MetricSet { + metrics::LongCountMetric count; + metrics::LongCountMetric docs; + metrics::DoubleAverageMetric latency; + + DocsumMetrics(); + ~DocsumMetrics(); + }; + std::mutex _lock; bool _closed; HandlerMap<ISearchHandler> _handlers; vespalib::ThreadStackExecutor _executor; + std::unique_ptr<metrics::MetricSet> _metrics; public: - /** - * Convenience typedefs. - */ - typedef std::unique_ptr<SummaryEngine> UP; - typedef std::shared_ptr<SummaryEngine> SP; SummaryEngine(const SummaryEngine &) = delete; SummaryEngine & operator = (const SummaryEngine &) = delete; @@ -105,6 +115,8 @@ public: * @param req The docsum request to perform. */ DocsumReply::UP getDocsums(DocsumRequest::UP req) override; + + metrics::MetricSet & getMetrics() { return *_metrics; } }; } // namespace proton diff --git a/searchlib/src/vespa/searchlib/engine/transport_metrics.cpp b/searchlib/src/vespa/searchlib/engine/transport_metrics.cpp index b3696641f8e..d291f968379 100644 --- a/searchlib/src/vespa/searchlib/engine/transport_metrics.cpp +++ b/searchlib/src/vespa/searchlib/engine/transport_metrics.cpp @@ -24,7 +24,7 @@ TransportMetrics::DocsumMetrics::DocsumMetrics(metrics::MetricSet *parent) TransportMetrics::DocsumMetrics::~DocsumMetrics() = default; TransportMetrics::TransportMetrics() - : metrics::MetricSet("transport", "", "Transport server metrics", 0), + : metrics::MetricSet("transport", "", "Transport server metrics", nullptr), updateLock(), query(this), docsum(this) diff --git a/searchlib/src/vespa/searchlib/engine/transportserver.cpp b/searchlib/src/vespa/searchlib/engine/transportserver.cpp index c97995db44d..005ef473817 100644 --- a/searchlib/src/vespa/searchlib/engine/transportserver.cpp +++ b/searchlib/src/vespa/searchlib/engine/transportserver.cpp @@ -32,7 +32,7 @@ void TransportServer::SearchHandler::start() { SearchReply::UP reply = parent._searchServer.search(std::move(request), *this); - if (reply.get() != 0) { + if (reply) { searchDone(std::move(reply)); } } @@ -40,7 +40,7 @@ TransportServer::SearchHandler::start() void TransportServer::SearchHandler::searchDone(SearchReply::UP reply) { - if (reply.get() != 0) { + if (reply) { const SearchReply &r = *reply; if (r.valid) { if (r.errorCode == 0) { @@ -86,7 +86,7 @@ void TransportServer::DocsumHandler::start() { DocsumReply::UP reply = parent._docsumServer.getDocsums(std::move(request), *this); - if (reply.get() != 0) { + if (reply) { getDocsumsDone(std::move(reply)); } } @@ -94,7 +94,7 @@ TransportServer::DocsumHandler::start() void TransportServer::DocsumHandler::getDocsumsDone(DocsumReply::UP reply) { - if (reply.get() != 0) { + if (reply) { const DocsumReply &r = *reply; for (uint32_t i = 0; i < r.docsums.size(); ++i) { PacketConverter::DOCSUM *p = new PacketConverter::DOCSUM(); @@ -109,9 +109,8 @@ TransportServer::DocsumHandler::getDocsumsDone(DocsumReply::UP reply) logPacket("outgoing packet", p, channel, 0); } channel->Send(p); - if (r.request.get() != NULL) { - parent.updateDocsumMetrics(r.request->getTimeUsed().sec(), - r.docsums.size()); // possible thread issue + if (r.request) { + parent.updateDocsumMetrics(r.request->getTimeUsed().sec(), r.docsums.size()); } } else { LOG(warning, "got <null> docsum reply from back-end"); @@ -130,7 +129,7 @@ void TransportServer::MonitorHandler::start() { MonitorReply::UP reply = parent._monitorServer.ping(std::move(request), *this); - if (reply.get() != 0) { + if (reply) { pingDone(std::move(reply)); } } @@ -138,7 +137,7 @@ TransportServer::MonitorHandler::start() void TransportServer::MonitorHandler::pingDone(MonitorReply::UP reply) { - if (reply.get() != 0) { + if (reply) { const MonitorReply &r = *reply; PacketConverter::MONITORRESULTX *p = new PacketConverter::MONITORRESULTX(); PacketConverter::fromMonitorReply(r, *p); |