summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-04-21 22:36:24 +0200
committerGitHub <noreply@github.com>2018-04-21 22:36:24 +0200
commit8545327c3b81679860eec19d8ac5ab5fec953734 (patch)
tree570c0a87eb8491dc6921982dc0e3c30053b528db
parentf1c7a5bdb741586598b4ea1b964aff407e1a3065 (diff)
parent5f241b23e800b3c9ec6c2d195649a9f6a7c63b88 (diff)
Merge pull request #5649 from vespa-engine/balder/transport-independent-docsum-metrics
Balder/transport independent docsum metrics
-rw-r--r--searchcore/src/tests/proton/summaryengine/summaryengine.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp43
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h31
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h26
-rw-r--r--searchlib/src/vespa/searchlib/engine/transport_metrics.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/engine/transportserver.cpp17
8 files changed, 107 insertions, 56 deletions
diff --git a/searchcore/src/tests/proton/summaryengine/summaryengine.cpp b/searchcore/src/tests/proton/summaryengine/summaryengine.cpp
index 408ca27c16e..8206eba6350 100644
--- a/searchcore/src/tests/proton/summaryengine/summaryengine.cpp
+++ b/searchcore/src/tests/proton/summaryengine/summaryengine.cpp
@@ -8,9 +8,12 @@
#include <vespa/vespalib/data/databuffer.h>
#include <vespa/vespalib/util/compressor.h>
#include <vespa/searchlib/common/transport.h>
+#include <vespa/metrics/metricset.h>
#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/log/log.h>
+#include <vespa/metrics/metrics.h>
+
LOG_SETUP("summaryengine_test");
using namespace search::engine;
@@ -204,6 +207,9 @@ TEST("requireThatCorrectHandlerIsUsed") {
EXPECT_TRUE(assertDocsumReply(engine, "bar", "bar reply"));
EXPECT_TRUE(assertDocsumReply(engine, "baz", "baz reply"));
EXPECT_TRUE(assertDocsumReply(engine, "not", "bar reply")); // uses the first (sorted on name)
+ EXPECT_EQUAL(4ul, static_cast<metrics::LongCountMetric *>(engine.getMetrics().getMetric("count"))->getValue());
+ EXPECT_EQUAL(4ul, static_cast<metrics::LongCountMetric *>(engine.getMetrics().getMetric("docs"))->getValue());
+ EXPECT_LESS(0.0, static_cast<metrics::DoubleAverageMetric *>(engine.getMetrics().getMetric("latency"))->getAverage());
}
using vespalib::Slime;
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 492618b0472..32dc711d5cf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -13,20 +13,26 @@
#include "searchhandlerproxy.h"
#include "simpleflush.h"
-#include <vespa/document/base/exceptions.h>
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/searchcommon/common/schemaconfigurer.h>
+#include <vespa/searchcore/proton/flushengine/flushengine.h>
#include <vespa/searchcore/proton/flushengine/flush_engine_explorer.h>
#include <vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.h>
#include <vespa/searchcore/proton/flushengine/tls_stats_factory.h>
#include <vespa/searchcore/proton/reference/document_db_reference_registry.h>
+#include <vespa/searchcore/proton/summaryengine/summaryengine.h>
+#include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h>
+#include <vespa/searchcore/proton/matchengine/matchengine.h>
#include <vespa/searchlib/transactionlog/trans_log_server_explorer.h>
#include <vespa/searchlib/util/fileheadertk.h>
+#include <vespa/document/base/exceptions.h>
+#include <vespa/document/datatype/documenttype.h>
+#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/host_name.h>
#include <vespa/vespalib/util/random.h>
+#include <vespa/searchlib/engine/transportserver.h>
+#include <vespa/vespalib/net/state_server.h>
#include <vespa/searchlib/aggregation/forcelink.hpp>
#include <vespa/searchlib/expression/forcelink.hpp>
@@ -130,10 +136,8 @@ Proton::ProtonFileHeaderContext::addTags(vespalib::GenericHeader &header,
void
-Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string &
- clusterName,
- const vespalib::string &
- baseDir)
+Proton::ProtonFileHeaderContext::setClusterName(const vespalib::string & clusterName,
+ const vespalib::string & baseDir)
{
if (!clusterName.empty()) {
_cluster = clusterName;
@@ -243,12 +247,12 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_tls = std::make_unique<TLS>(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext);
_metricsEngine->addMetricsHook(_metricsHook);
_fileHeaderContext.setClusterName(protonConfig.clustername, protonConfig.basedir);
- _matchEngine.reset(new MatchEngine(protonConfig.numsearcherthreads,
- protonConfig.numthreadspersearch,
- protonConfig.distributionkey));
+ _matchEngine = std::make_unique<MatchEngine>(protonConfig.numsearcherthreads,
+ protonConfig.numthreadspersearch,
+ protonConfig.distributionkey);
_distributionKey = protonConfig.distributionkey;
- _summaryEngine.reset(new SummaryEngine(protonConfig.numsummarythreads));
- _docsumBySlime.reset(new DocsumBySlime(*_summaryEngine));
+ _summaryEngine= std::make_unique<SummaryEngine>(protonConfig.numsummarythreads);
+ _docsumBySlime = std::make_unique<DocsumBySlime>(*_summaryEngine);
IFlushStrategy::SP strategy;
const ProtonConfig::Flush & flush(protonConfig.flush);
switch (flush.strategy) {
@@ -262,17 +266,18 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
}
case ProtonConfig::Flush::SIMPLE:
default:
- strategy.reset(new SimpleFlush());
+ strategy = std::make_shared<SimpleFlush>();
break;
}
vespalib::mkdir(protonConfig.basedir + "/documents", true);
vespalib::chdir(protonConfig.basedir);
_tls->start();
- _flushEngine.reset(new FlushEngine(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()),
- strategy, flush.maxconcurrent, flush.idleinterval*1000));
- _fs4Server.reset(new TransportServer(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL));
+ _flushEngine = std::make_unique<FlushEngine>(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()),
+ strategy, flush.maxconcurrent, flush.idleinterval*1000);
+ _fs4Server = std::make_unique<TransportServer>(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL);
_fs4Server->setTCPNoDelay(true);
_metricsEngine->addExternalMetrics(_fs4Server->getMetrics());
+ _metricsEngine->addExternalMetrics(_summaryEngine->getMetrics());
char tmp[1024];
LOG(debug, "Start proton server with root at %s and cwd at %s",
@@ -305,7 +310,8 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
waitForInitDone();
_metricsEngine->start(_configUri);
- _stateServer.reset(new vespalib::StateServer(protonConfig.httpport, _healthAdapter, _metricsEngine->metrics_producer(), *this));
+ _stateServer = std::make_unique<vespalib::StateServer>(protonConfig.httpport, _healthAdapter,
+ _metricsEngine->metrics_producer(), *this);
_customComponentBindToken = _stateServer->repo().bind(CUSTOM_COMPONENT_API_PATH, _genericStateHandler);
_customComponentRootToken = _stateServer->repo().add_root_resource(CUSTOM_COMPONENT_API_PATH);
@@ -399,6 +405,9 @@ Proton::~Proton()
if (_matchEngine) {
_matchEngine->close();
}
+ if (_metricsEngine && _summaryEngine) {
+ _metricsEngine->removeExternalMetrics(_summaryEngine->getMetrics());
+ }
if (_summaryEngine) {
_summaryEngine->close();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 6dfc6c429a3..dd2b42d94cf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -12,33 +12,36 @@
#include "proton_config_fetcher.h"
#include "proton_configurer.h"
#include "rpc_hooks.h"
-#include <vespa/searchcore/proton/flushengine/flushengine.h>
-#include <vespa/searchcore/proton/matchengine/matchengine.h>
#include <vespa/searchcore/proton/matching/querylimiter.h>
#include <vespa/searchcore/proton/metrics/metrics_engine.h>
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h>
#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
-#include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h>
-#include <vespa/searchcore/proton/summaryengine/summaryengine.h>
#include <vespa/searchlib/common/fileheadercontext.h>
#include <vespa/searchlib/engine/monitorapi.h>
-#include <vespa/searchlib/engine/transportserver.h>
#include <vespa/searchlib/transactionlog/translogserverapp.h>
#include <vespa/vespalib/net/component_config_producer.h>
#include <vespa/vespalib/net/generic_state_handler.h>
#include <vespa/vespalib/net/json_get_handler.h>
+#include <vespa/vespalib/net/json_handler_repo.h>
#include <vespa/vespalib/net/state_explorer.h>
-#include <vespa/vespalib/net/state_server.h>
#include <vespa/vespalib/util/varholder.h>
#include <mutex>
#include <shared_mutex>
+namespace search::engine { class TransportServer; }
+
+namespace vespalib { class StateServer; }
+
namespace proton {
class DiskMemUsageSampler;
class IDocumentDBReferenceRegistry;
class PrepareRestartHandler;
+class SummaryEngine;
+class DocsumBySlime;
+class FlushEngine;
+class MatchEngine;
class Proton : public IProtonConfigurerOwner,
public search::engine::MonitorServer,
@@ -50,7 +53,7 @@ class Proton : public IProtonConfigurerOwner,
{
private:
typedef search::transactionlog::TransLogServerApp TLS;
- typedef search::engine::TransportServer TransportServer;
+ using TransportServer = search::engine::TransportServer;
typedef search::engine::MonitorRequest MonitorRequest;
typedef search::engine::MonitorReply MonitorReply;
typedef search::engine::MonitorClient MonitorClient;
@@ -88,25 +91,25 @@ private:
const config::ConfigUri _configUri;
mutable std::shared_timed_mutex _mutex;
MetricsUpdateHook _metricsHook;
- MetricsEngine::UP _metricsEngine;
+ std::unique_ptr<MetricsEngine> _metricsEngine;
ProtonFileHeaderContext _fileHeaderContext;
TLS::UP _tls;
std::unique_ptr<DiskMemUsageSampler> _diskMemUsageSampler;
PersistenceEngine::UP _persistenceEngine;
DocumentDBMap _documentDBMap;
- MatchEngine::UP _matchEngine;
- SummaryEngine::UP _summaryEngine;
- DocsumBySlime::UP _docsumBySlime;
+ std::unique_ptr<MatchEngine> _matchEngine;
+ std::unique_ptr<SummaryEngine> _summaryEngine;
+ std::unique_ptr<DocsumBySlime> _docsumBySlime;
MemoryFlushConfigUpdater::UP _memoryFlushConfigUpdater;
- FlushEngine::UP _flushEngine;
+ std::unique_ptr<FlushEngine> _flushEngine;
std::unique_ptr<PrepareRestartHandler> _prepareRestartHandler;
RPCHooks::UP _rpcHooks;
HealthAdapter _healthAdapter;
vespalib::GenericStateHandler _genericStateHandler;
vespalib::JsonHandlerRepo::Token::UP _customComponentBindToken;
vespalib::JsonHandlerRepo::Token::UP _customComponentRootToken;
- vespalib::StateServer::UP _stateServer;
- TransportServer::UP _fs4Server;
+ std::unique_ptr<vespalib::StateServer> _stateServer;
+ std::unique_ptr<TransportServer> _fs4Server;
vespalib::ThreadStackExecutor _executor;
ProtonConfigurer _protonConfigurer;
ProtonConfigFetcher _protonConfigFetcher;
diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
index b80e4ff87ea..ab012760762 100644
--- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
@@ -2,6 +2,8 @@
#include "rpc_hooks.h"
#include "proton.h"
+#include <vespa/searchcore/proton/summaryengine/docsum_by_slime.h>
+#include <vespa/searchcore/proton/matchengine/matchengine.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/fnet/frt/supervisor.h>
@@ -24,7 +26,7 @@ struct Pair {
~Pair();
};
-Pair::~Pair() {}
+Pair::~Pair() = default;
}
@@ -46,7 +48,7 @@ RPCHooksBase::checkState(StateArg::UP arg)
Session & session = *arg->_session;
Executor::Task::UP failedTask = _executor.execute(makeTask(
makeClosure(this, &RPCHooksBase::checkState, std::move(arg))));
- if (failedTask.get() != NULL) {
+ if (failedTask) {
reportState(session, req);
req->Return();
}
@@ -273,7 +275,7 @@ RPCHooksBase::rpc_GetState(FRT_RPCRequest *req)
} else {
fastos::TimeStamp dueTime(fastos::ClockSystem::now() + timeoutMS * fastos::TimeStamp::MS);
StateArg::UP stateArg(new StateArg(sharedSession, req, dueTime));
- if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg)))).get() != NULL) {
+ if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg))))) {
reportState(*sharedSession, req);
req->Return();
} else {
@@ -342,7 +344,7 @@ RPCHooksBase::rpc_getIncrementalState(FRT_RPCRequest *req)
} else {
fastos::TimeStamp dueTime(fastos::ClockSystem::now() + timeoutMS * fastos::TimeStamp::MS);
StateArg::UP stateArg(new StateArg(sharedSession, req, dueTime));
- if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg)))).get() != NULL) {
+ if (_executor.execute(makeTask(makeClosure(this, &RPCHooksBase::checkState, std::move(stateArg))))) {
reportState(*sharedSession, req);
req->Return();
} else {
diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
index 30e9382ae14..498c8b914db 100644
--- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
@@ -32,14 +32,23 @@ public:
namespace proton {
+SummaryEngine::DocsumMetrics::DocsumMetrics()
+ : metrics::MetricSet("docsum", "", "Docsum metrics", nullptr),
+ count("count", "logdefault", "Docsum requests handled", this),
+ docs("docs", "logdefault", "Total docsums returned", this),
+ latency("latency", "logdefault", "Docsum request latency", this)
+{
+}
+
+SummaryEngine::DocsumMetrics::~DocsumMetrics() = default;
+
SummaryEngine::SummaryEngine(size_t numThreads)
: _lock(),
_closed(false),
_handlers(),
- _executor(numThreads, 128 * 1024)
-{
- // empty
-}
+ _executor(numThreads, 128 * 1024),
+ _metrics(std::make_unique<DocsumMetrics>())
+{ }
SummaryEngine::~SummaryEngine()
{
@@ -114,10 +123,21 @@ SummaryEngine::getDocsums(DocsumRequest::UP req)
reply = snapshot->get()->getDocsums(*req); // use the first handler
}
}
+ updateDocsumMetrics(req->getTimeUsed().sec(), reply->docsums.size());
}
reply->request = std::move(req);
+
return reply;
}
+void
+SummaryEngine::updateDocsumMetrics(double latency_s, uint32_t numDocs)
+{
+ std::lock_guard guard(_lock);
+ DocsumMetrics & m = static_cast<DocsumMetrics &>(*_metrics);
+ m.count.inc();
+ m.docs.inc(numDocs);
+ m.latency.set(latency_s);
+}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h
index 2420a656909..029aefacfc8 100644
--- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h
+++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h
@@ -1,33 +1,43 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "isearchhandler.h"
+#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/searchcore/proton/common/handlermap.hpp>
-#include <vespa/searchcore/proton/summaryengine/isearchhandler.h>
#include <vespa/searchlib/engine/docsumapi.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
-#include <vespa/searchcore/proton/common/doctypename.h>
+#include <vespa/metrics/valuemetric.h>
+#include <vespa/metrics/countmetric.h>
+#include <vespa/metrics/metricset.h>
#include <mutex>
+
namespace proton {
class SummaryEngine : public search::engine::DocsumServer
{
private:
+ void updateDocsumMetrics(double latency_s, uint32_t numDocs);
using DocsumReply = search::engine::DocsumReply;
using DocsumRequest = search::engine::DocsumRequest;
using DocsumClient = search::engine::DocsumClient;
+ struct DocsumMetrics : metrics::MetricSet {
+ metrics::LongCountMetric count;
+ metrics::LongCountMetric docs;
+ metrics::DoubleAverageMetric latency;
+
+ DocsumMetrics();
+ ~DocsumMetrics();
+ };
+
std::mutex _lock;
bool _closed;
HandlerMap<ISearchHandler> _handlers;
vespalib::ThreadStackExecutor _executor;
+ std::unique_ptr<metrics::MetricSet> _metrics;
public:
- /**
- * Convenience typedefs.
- */
- typedef std::unique_ptr<SummaryEngine> UP;
- typedef std::shared_ptr<SummaryEngine> SP;
SummaryEngine(const SummaryEngine &) = delete;
SummaryEngine & operator = (const SummaryEngine &) = delete;
@@ -105,6 +115,8 @@ public:
* @param req The docsum request to perform.
*/
DocsumReply::UP getDocsums(DocsumRequest::UP req) override;
+
+ metrics::MetricSet & getMetrics() { return *_metrics; }
};
} // namespace proton
diff --git a/searchlib/src/vespa/searchlib/engine/transport_metrics.cpp b/searchlib/src/vespa/searchlib/engine/transport_metrics.cpp
index b3696641f8e..d291f968379 100644
--- a/searchlib/src/vespa/searchlib/engine/transport_metrics.cpp
+++ b/searchlib/src/vespa/searchlib/engine/transport_metrics.cpp
@@ -24,7 +24,7 @@ TransportMetrics::DocsumMetrics::DocsumMetrics(metrics::MetricSet *parent)
TransportMetrics::DocsumMetrics::~DocsumMetrics() = default;
TransportMetrics::TransportMetrics()
- : metrics::MetricSet("transport", "", "Transport server metrics", 0),
+ : metrics::MetricSet("transport", "", "Transport server metrics", nullptr),
updateLock(),
query(this),
docsum(this)
diff --git a/searchlib/src/vespa/searchlib/engine/transportserver.cpp b/searchlib/src/vespa/searchlib/engine/transportserver.cpp
index c97995db44d..005ef473817 100644
--- a/searchlib/src/vespa/searchlib/engine/transportserver.cpp
+++ b/searchlib/src/vespa/searchlib/engine/transportserver.cpp
@@ -32,7 +32,7 @@ void
TransportServer::SearchHandler::start()
{
SearchReply::UP reply = parent._searchServer.search(std::move(request), *this);
- if (reply.get() != 0) {
+ if (reply) {
searchDone(std::move(reply));
}
}
@@ -40,7 +40,7 @@ TransportServer::SearchHandler::start()
void
TransportServer::SearchHandler::searchDone(SearchReply::UP reply)
{
- if (reply.get() != 0) {
+ if (reply) {
const SearchReply &r = *reply;
if (r.valid) {
if (r.errorCode == 0) {
@@ -86,7 +86,7 @@ void
TransportServer::DocsumHandler::start()
{
DocsumReply::UP reply = parent._docsumServer.getDocsums(std::move(request), *this);
- if (reply.get() != 0) {
+ if (reply) {
getDocsumsDone(std::move(reply));
}
}
@@ -94,7 +94,7 @@ TransportServer::DocsumHandler::start()
void
TransportServer::DocsumHandler::getDocsumsDone(DocsumReply::UP reply)
{
- if (reply.get() != 0) {
+ if (reply) {
const DocsumReply &r = *reply;
for (uint32_t i = 0; i < r.docsums.size(); ++i) {
PacketConverter::DOCSUM *p = new PacketConverter::DOCSUM();
@@ -109,9 +109,8 @@ TransportServer::DocsumHandler::getDocsumsDone(DocsumReply::UP reply)
logPacket("outgoing packet", p, channel, 0);
}
channel->Send(p);
- if (r.request.get() != NULL) {
- parent.updateDocsumMetrics(r.request->getTimeUsed().sec(),
- r.docsums.size()); // possible thread issue
+ if (r.request) {
+ parent.updateDocsumMetrics(r.request->getTimeUsed().sec(), r.docsums.size());
}
} else {
LOG(warning, "got <null> docsum reply from back-end");
@@ -130,7 +129,7 @@ void
TransportServer::MonitorHandler::start()
{
MonitorReply::UP reply = parent._monitorServer.ping(std::move(request), *this);
- if (reply.get() != 0) {
+ if (reply) {
pingDone(std::move(reply));
}
}
@@ -138,7 +137,7 @@ TransportServer::MonitorHandler::start()
void
TransportServer::MonitorHandler::pingDone(MonitorReply::UP reply)
{
- if (reply.get() != 0) {
+ if (reply) {
const MonitorReply &r = *reply;
PacketConverter::MONITORRESULTX *p = new PacketConverter::MONITORRESULTX();
PacketConverter::fromMonitorReply(r, *p);