diff options
Diffstat (limited to 'searchlib')
7 files changed, 0 insertions, 1040 deletions
diff --git a/searchlib/CMakeLists.txt b/searchlib/CMakeLists.txt index 449580e577b..af16bd08c7a 100644 --- a/searchlib/CMakeLists.txt +++ b/searchlib/CMakeLists.txt @@ -124,7 +124,6 @@ vespa_define_module( src/tests/engine/proto_converter src/tests/engine/proto_rpc_adapter src/tests/engine/searchapi - src/tests/engine/transportserver src/tests/expression/attributenode src/tests/features src/tests/features/beta diff --git a/searchlib/src/tests/engine/transportserver/.gitignore b/searchlib/src/tests/engine/transportserver/.gitignore deleted file mode 100644 index 09d836e0004..00000000000 --- a/searchlib/src/tests/engine/transportserver/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -.depend -Makefile -transportserver_test -vlog.txt -searchlib_transportserver_test_app diff --git a/searchlib/src/tests/engine/transportserver/CMakeLists.txt b/searchlib/src/tests/engine/transportserver/CMakeLists.txt deleted file mode 100644 index b4f94884ce4..00000000000 --- a/searchlib/src/tests/engine/transportserver/CMakeLists.txt +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(searchlib_transportserver_test_app TEST - SOURCES - transportserver_test.cpp - DEPENDS - searchlib -) -vespa_add_test( - NAME searchlib_transportserver_test_app - COMMAND searchlib_transportserver_test_app - ENVIRONMENT "VESPA_LOG_TARGET=file:vlog.txt;VESPA_LOG_LEVEL=\"all -spam\"" -) diff --git a/searchlib/src/tests/engine/transportserver/transportserver_test.cpp b/searchlib/src/tests/engine/transportserver/transportserver_test.cpp deleted file mode 100644 index baa581c65f9..00000000000 --- a/searchlib/src/tests/engine/transportserver/transportserver_test.cpp +++ /dev/null @@ -1,232 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/vespalib/testkit/testapp.h> -#include <vespa/document/base/documentid.h> -#include <vespa/searchlib/common/packets.h> -#include <vespa/searchlib/engine/transportserver.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/fnet/fnet.h> -#include <vespa/searchlib/engine/errorcodes.h> -#include <thread> -#include <chrono> -#include <vespa/log/log.h> -LOG_SETUP("transportserver_test"); - -using namespace document; -using namespace vespalib; -using namespace search::engine; -using namespace search::fs4transport; - -class SyncServer : public search::engine::SearchServer, - public search::engine::DocsumServer, - public search::engine::MonitorServer -{ -private: - virtual SearchReply::UP search(SearchRequest::Source request, SearchClient &client) override; - virtual DocsumReply::UP getDocsums(DocsumRequest::Source request, DocsumClient &client) override; - virtual MonitorReply::UP ping(MonitorRequest::UP request, MonitorClient &client) override; - - SyncServer(const SyncServer &); - SyncServer &operator=(const SyncServer &); -public: - SyncServer() {} - virtual ~SyncServer() {} -}; - -SearchReply::UP -SyncServer::search(SearchRequest::Source request, SearchClient &) -{ - // fastos should use steady clock (this may fail) - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - fastos::TimeStamp my_time = fastos::ClockSystem::now(); - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - EXPECT_GREATER_EQUAL(my_time.val(), request->getStartTime()); - const SearchRequest &req = *request.get(); - SearchReply::UP reply(new SearchReply()); - SearchReply &ret = *reply; - ret.request = request.release(); - LOG(info, "responding to search request..."); - ret.offset = req.offset; - return reply; -} - -DocsumReply::UP -SyncServer::getDocsums(DocsumRequest::Source request, DocsumClient &) -{ - // fastos should use steady clock (this may fail) - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - fastos::TimeStamp my_time = fastos::ClockSystem::now(); - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - EXPECT_GREATER_EQUAL(my_time.val(), request->getStartTime()); - DocsumReply::UP reply(new DocsumReply()); - DocsumReply &ret = *reply; - ret.request = request.release(); - LOG(info, "responding to docsum request..."); - ret.docsums.resize(1); - ret.docsums[0].setData("data", strlen("data")); - ret.docsums[0].gid = DocumentId(vespalib::make_string("id:ns:type::100")).getGlobalId(); - return reply; -} - -MonitorReply::UP -SyncServer::ping(MonitorRequest::UP request, MonitorClient &) -{ - MonitorRequest &req = *request; - MonitorReply::UP reply(new MonitorReply()); - MonitorReply &ret = *reply; - LOG(info, "responding to monitor request..."); - ret.timestamp = req.flags; - return reply; -} - -TEST("transportserver") { - { - SyncServer server; - TransportServer transport(server, server, server, 0, - TransportServer::DEBUG_ALL); - ASSERT_TRUE(transport.start()); - int port = transport.getListenPort(); - ASSERT_TRUE(port > 0); - { - FNET_Context ctx; - FastOS_ThreadPool pool(128 * 1024); - FNET_Transport client; - ASSERT_TRUE(client.Start(&pool)); - - FNET_PacketQueue adminQ; - FNET_Connection *conn = client.Connect(make_string("tcp/localhost:%d", port).c_str(), - &FS4PersistentPacketStreamer::Instance, &adminQ); - ASSERT_TRUE(conn != 0); - { - FS4Packet_MONITORQUERYX *mq = new FS4Packet_MONITORQUERYX(); - mq->_qflags = 30; - mq->_features |= MQF_QFLAGS; - conn->PostPacket(mq, FNET_NOID); - FNET_Packet *p = adminQ.DequeuePacket(60000, &ctx); - ASSERT_TRUE(p != 0); - ASSERT_TRUE(p->GetPCODE() == PCODE_MONITORRESULTX); - FS4Packet_MONITORRESULTX *r = (FS4Packet_MONITORRESULTX*)p; - EXPECT_EQUAL(r->_timestamp, 30u); - p->Free(); - } - { - FNET_PacketQueue q; - FNET_Channel *ch = conn->OpenChannel(&q, FNET_Context()); - FS4Packet_QUERYX *qx = new FS4Packet_QUERYX(); - qx->_features |= QF_PARSEDQUERY; - qx->_offset = 100; - ch->Send(qx); - FNET_Packet *p = q.DequeuePacket(60000, &ctx); - ASSERT_TRUE(p != 0); - ASSERT_TRUE(p->GetPCODE() == PCODE_QUERYRESULTX); - FS4Packet_QUERYRESULTX *r = (FS4Packet_QUERYRESULTX*)p; - EXPECT_EQUAL(r->_offset, 100u); - p->Free(); - ch->CloseAndFree(); - } - { - FS4Packet_MONITORQUERYX *mq = new FS4Packet_MONITORQUERYX(); - mq->_qflags = 40; - mq->_features |= MQF_QFLAGS; - conn->PostPacket(mq, FNET_NOID); - FNET_Packet *p = adminQ.DequeuePacket(60000, &ctx); - ASSERT_TRUE(p != 0); - ASSERT_TRUE(p->GetPCODE() == PCODE_MONITORRESULTX); - FS4Packet_MONITORRESULTX *r = (FS4Packet_MONITORRESULTX*)p; - EXPECT_EQUAL(r->_timestamp, 40u); - p->Free(); - } - { - FNET_PacketQueue q; - FNET_Channel *ch = conn->OpenChannel(&q, FNET_Context()); - FS4Packet_GETDOCSUMSX *qdx = new FS4Packet_GETDOCSUMSX(); - ch->Send(qdx); - FNET_Packet *p = q.DequeuePacket(60000, &ctx); - ASSERT_TRUE(p != 0); - ASSERT_TRUE(p->GetPCODE() == PCODE_DOCSUM); - FS4Packet_DOCSUM *r = (FS4Packet_DOCSUM*)p; - EXPECT_EQUAL(r->getGid(), DocumentId("id:ns:type::100").getGlobalId()); - p->Free(); - p = q.DequeuePacket(60000, &ctx); - ASSERT_TRUE(p != 0); - ASSERT_TRUE(p->GetPCODE() == PCODE_EOL); - p->Free(); - ch->CloseAndFree(); - } - { - FS4Packet_MONITORQUERYX *mq = new FS4Packet_MONITORQUERYX(); - mq->_qflags = 50; - mq->_features |= MQF_QFLAGS; - conn->PostPacket(mq, FNET_NOID); - FNET_Packet *p = adminQ.DequeuePacket(60000, &ctx); - ASSERT_TRUE(p != 0); - ASSERT_TRUE(p->GetPCODE() == PCODE_MONITORRESULTX); - FS4Packet_MONITORRESULTX *r = (FS4Packet_MONITORRESULTX*)p; - EXPECT_EQUAL(r->_timestamp, 50u); - p->Free(); - } - // shut down client - conn->CloseAdminChannel(); - client.Close(conn); - conn->SubRef(); - client.sync(); - client.ShutDown(true); - pool.Close(); - } - - } -} - -void printError(ErrorCode ecode) { - fprintf(stderr, "error code %u: '%s'\n", ecode, getStringFromErrorCode(ecode)); -} - -TEST("print errors") { - printError(ECODE_NO_ERROR); - printError(ECODE_GENERAL_ERROR); - printError(ECODE_QUERY_PARSE_ERROR); - printError(ECODE_ALL_PARTITIONS_DOWN); - printError(ECODE_ILLEGAL_DATASET); - printError(ECODE_OVERLOADED); - printError(ECODE_NOT_IMPLEMENTED); - printError(ECODE_QUERY_NOT_ALLOWED); - printError(ECODE_TIMEOUT); -} - -TEST("test SearchReply::Coverage") { - SearchReply::Coverage c; - EXPECT_EQUAL(0u, c.getActive()); - EXPECT_EQUAL(0u, c.getSoonActive()); - EXPECT_EQUAL(0u, c.getCovered()); - EXPECT_EQUAL(0u, c.getDegradeReason()); -} - -TEST("test SearchReply::Coverage(7)") { - SearchReply::Coverage c(7); - EXPECT_EQUAL(7u, c.getActive()); - EXPECT_EQUAL(7u, c.getSoonActive()); - EXPECT_EQUAL(7u, c.getCovered()); - EXPECT_EQUAL(0u, c.getDegradeReason()); -} - -TEST("test SearchReply::Coverage(7, 19)") { - SearchReply::Coverage c(19, 7); - EXPECT_EQUAL(19u, c.getActive()); - EXPECT_EQUAL(19u, c.getSoonActive()); - EXPECT_EQUAL(7u, c.getCovered()); - EXPECT_EQUAL(0u, c.getDegradeReason()); -} - -TEST("test SearchReply::Coverage set and get") { - SearchReply::Coverage c; - EXPECT_EQUAL(7u, c.setActive(7).getActive()); - EXPECT_EQUAL(9u, c.setSoonActive(9).getSoonActive()); - EXPECT_EQUAL(19u, c.setCovered(19).getCovered()); - EXPECT_EQUAL(5u, c.setDegradeReason(5).getDegradeReason()); - EXPECT_EQUAL(1u, SearchReply::Coverage().degradeMatchPhase().getDegradeReason()); - EXPECT_EQUAL(2u, SearchReply::Coverage().degradeTimeout().getDegradeReason()); - EXPECT_EQUAL(4u, SearchReply::Coverage().degradeAdaptiveTimeout().getDegradeReason()); - EXPECT_EQUAL(7u, SearchReply::Coverage().degradeAdaptiveTimeout().degradeTimeout().degradeMatchPhase().getDegradeReason()); -} - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/vespa/searchlib/engine/CMakeLists.txt b/searchlib/src/vespa/searchlib/engine/CMakeLists.txt index 36ff4e12108..303de79ae49 100644 --- a/searchlib/src/vespa/searchlib/engine/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/engine/CMakeLists.txt @@ -28,7 +28,6 @@ vespa_add_library(searchlib_engine OBJECT searchrequest.cpp trace.cpp transport_metrics.cpp - transportserver.cpp ${searchlib_engine_PROTOBUF_SRCS} DEPENDS ) diff --git a/searchlib/src/vespa/searchlib/engine/transportserver.cpp b/searchlib/src/vespa/searchlib/engine/transportserver.cpp deleted file mode 100644 index d14735b7770..00000000000 --- a/searchlib/src/vespa/searchlib/engine/transportserver.cpp +++ /dev/null @@ -1,464 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "packetconverter.h" -#include "transportserver.h" -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/fnet/channel.h> -#include <vespa/fnet/connection.h> -#include <vespa/fnet/connector.h> -#include <vespa/fnet/iexecutable.h> -#include <vespa/vespalib/net/crypto_engine.h> - -#include <vespa/log/log.h> -LOG_SETUP(".engine.transportserver"); - -namespace search::engine { - -namespace { - -struct SearchRequestDecoder : SearchRequest::Source::Decoder { - PacketConverter::QUERYX *packet; - RelativeTime relative_time; - SearchRequestDecoder(PacketConverter::QUERYX *qx) - : packet(qx), relative_time(std::make_unique<FastosClock>()) {} - std::unique_ptr<SearchRequest> decode() override { - auto req = std::make_unique<SearchRequest>(std::move(relative_time)); - PacketConverter::toSearchRequest(*packet, *req); - return req; - } - ~SearchRequestDecoder() override { packet->Free(); } -}; - -std::unique_ptr<SearchRequest::Source::Decoder> search_request_decoder(PacketConverter::QUERYX *qx) { - return std::make_unique<SearchRequestDecoder>(qx); -} - -struct DocsumRequestDecoder : DocsumRequest::Source::Decoder { - PacketConverter::GETDOCSUMSX *packet; - RelativeTime relative_time; - DocsumRequestDecoder(PacketConverter::GETDOCSUMSX *gdx) - : packet(gdx), relative_time(std::make_unique<FastosClock>()) {} - std::unique_ptr<DocsumRequest> decode() override { - auto req = std::make_unique<DocsumRequest>(std::move(relative_time), false); - PacketConverter::toDocsumRequest(*packet, *req); - return req; - } - ~DocsumRequestDecoder() override { packet->Free(); } -}; - -std::unique_ptr<DocsumRequest::Source::Decoder> docsum_request_decoder(PacketConverter::GETDOCSUMSX *gdx) { - return std::make_unique<DocsumRequestDecoder>(gdx); -} - -} - -//----------------------------------------------------------------------------- - -typedef search::fs4transport::FS4PersistentPacketStreamer PacketStreamer; - -//----------------------------------------------------------------------------- - -constexpr uint32_t TransportServer::DEBUG_NONE; -constexpr uint32_t TransportServer::DEBUG_CONNECTION; -constexpr uint32_t TransportServer::DEBUG_CHANNEL; -constexpr uint32_t TransportServer::DEBUG_SEARCH; -constexpr uint32_t TransportServer::DEBUG_DOCSUM; -constexpr uint32_t TransportServer::DEBUG_MONITOR; -constexpr uint32_t TransportServer::DEBUG_UNHANDLED; -constexpr uint32_t TransportServer::DEBUG_ALL; - -void -TransportServer::SearchHandler::start() -{ - SearchReply::UP reply = parent._searchServer.search(std::move(request), *this); - if (reply) { - searchDone(std::move(reply)); - } -} - -void -TransportServer::SearchHandler::searchDone(SearchReply::UP reply) -{ - if (reply) { - const SearchReply &r = *reply; - if (r.valid) { - if (r.errorCode == 0) { - PacketConverter::QUERYRESULTX *p = new PacketConverter::QUERYRESULTX(); - PacketConverter::fromSearchReply(r, *p); - if (shouldLog(DEBUG_SEARCH)) { - logPacket("outgoing packet", p, channel, 0); - } - channel->Send(p); - } else { - PacketConverter::ERROR *p = new PacketConverter::ERROR(); - p->_errorCode = r.errorCode; - p->setErrorMessage(r.errorMessage); - if (shouldLog(DEBUG_SEARCH)) { - logPacket("outgoing packet", p, channel, 0); - } - channel->Send(p); - } - if (r.request) { - parent.updateQueryMetrics(r.request->getTimeUsed().sec()); // possible thread issue - } - } else { - PacketConverter::EOL *p = new PacketConverter::EOL(); - if (shouldLog(DEBUG_SEARCH)) { - logPacket("outgoing packet", p, channel, 0); - } - channel->Send(p); - } - } else { - LOG(warning, "got <null> search reply from back-end"); - } - delete this; // we are done -} - -TransportServer::SearchHandler::~SearchHandler() -{ - channel->Free(); -} - -//----------------------------------------------------------------------------- - -void -TransportServer::DocsumHandler::start() -{ - DocsumReply::UP reply = parent._docsumServer.getDocsums(std::move(request), *this); - if (reply) { - getDocsumsDone(std::move(reply)); - } -} - -void -TransportServer::DocsumHandler::getDocsumsDone(DocsumReply::UP reply) -{ - if (reply) { - const DocsumReply &r = *reply; - for (uint32_t i = 0; i < r.docsums.size(); ++i) { - PacketConverter::DOCSUM *p = new PacketConverter::DOCSUM(); - PacketConverter::fromDocsumReplyElement(r.docsums[i], *p); - if (shouldLog(DEBUG_DOCSUM)) { - logPacket("outgoing packet", p, channel, 0); - } - channel->Send(p); - } - PacketConverter::EOL *p = new PacketConverter::EOL(); - if (shouldLog(DEBUG_DOCSUM)) { - logPacket("outgoing packet", p, channel, 0); - } - channel->Send(p); - if (r.request) { - parent.updateDocsumMetrics(r.request->getTimeUsed().sec(), r.docsums.size()); - } - } else { - LOG(warning, "got <null> docsum reply from back-end"); - } - delete this; // we are done -} - -TransportServer::DocsumHandler::~DocsumHandler() -{ - channel->Free(); -} - -//----------------------------------------------------------------------------- - -void -TransportServer::MonitorHandler::start() -{ - MonitorReply::UP reply = parent._monitorServer.ping(std::move(request), *this); - if (reply) { - pingDone(std::move(reply)); - } -} - -void -TransportServer::MonitorHandler::pingDone(MonitorReply::UP reply) -{ - if (reply) { - const MonitorReply &r = *reply; - PacketConverter::MONITORRESULTX *p = new PacketConverter::MONITORRESULTX(); - PacketConverter::fromMonitorReply(r, *p); - if (shouldLog(DEBUG_MONITOR)) { - logPacket("outgoing packet", p, 0, connection); - } - connection->PostPacket(p, FNET_NOID); - } else { - LOG(warning, "got <null> monitor reply from back-end"); - } - delete this; // we are done -} - -TransportServer::MonitorHandler::~MonitorHandler() -{ - connection->SubRef(); -} - -//----------------------------------------------------------------------------- - -FNET_IPacketHandler::HP_RetCode -TransportServer::HandlePacket(FNET_Packet *packet, FNET_Context context) -{ - uint32_t pcode = packet->GetPCODE(); - FNET_Channel *channel = context._value.CHANNEL; - HP_RetCode rc = FNET_FREE_CHANNEL; - - if (channel->GetID() == FNET_NOID) { // admin packet - if (packet->IsChannelLostCMD()) { - _clients.erase(channel); - if (shouldLog(DEBUG_CONNECTION)) { - LOG(debug, "connection closed: tag=%u", channel->GetConnection()->GetContext()._value.INT); - } - } else if (pcode == search::fs4transport::PCODE_MONITORQUERYX) { - const PacketConverter::MONITORQUERYX &mqx = static_cast<PacketConverter::MONITORQUERYX&>(*packet); - if (shouldLog(DEBUG_MONITOR)) { - logPacket("incoming packet", packet, channel, 0); - } - MonitorRequest::UP req(new MonitorRequest()); - PacketConverter::toMonitorRequest(mqx, *req); - channel->GetConnection()->AddRef(); - _pending.push(new MonitorHandler(*this, std::move(req), channel->GetConnection())); - rc = FNET_KEEP_CHANNEL; - } else if (shouldLog(DEBUG_UNHANDLED)) { - logPacket("unhandled packet", packet, channel, 0); - } - } else { // search/docsum request - if (pcode == search::fs4transport::PCODE_QUERYX) { - PacketConverter::QUERYX * qx = static_cast<PacketConverter::QUERYX *>(packet); - if (shouldLog(DEBUG_SEARCH)) { - logPacket("incoming packet", packet, channel, 0); - } - SearchRequest::Source req(search_request_decoder(qx)); - packet = nullptr; - _pending.push(new SearchHandler(*this, std::move(req), channel, _clients.size())); - rc = FNET_CLOSE_CHANNEL; - } else if (pcode == search::fs4transport::PCODE_GETDOCSUMSX) { - PacketConverter::GETDOCSUMSX * gdx = static_cast<PacketConverter::GETDOCSUMSX *>(packet); - if (shouldLog(DEBUG_DOCSUM)) { - logPacket("incoming packet", packet, channel, 0); - } - DocsumRequest::Source req(docsum_request_decoder(gdx)); - packet = nullptr; - _pending.push(new DocsumHandler(*this, std::move(req), channel)); - rc = FNET_CLOSE_CHANNEL; - } else if (shouldLog(DEBUG_UNHANDLED)) { - logPacket("unhandled packet", packet, channel, 0); - } - } - if (packet != nullptr) { - packet->Free(); - } - return rc; -} - -bool -TransportServer::InitAdminChannel(FNET_Channel *channel) -{ - if (_listener == nullptr) { - // handle race where we get an incoming connection and - // disables listening at the 'same time'. Note that sync close - // is only allowed in the InitAdminChannel method - channel->GetConnection()->Close(); // sync close - return false; - } - channel->SetContext(channel); - channel->SetHandler(this); - assert(_clients.count(channel) == 0); - _clients.insert(channel); - channel->GetConnection()->SetContext(FNET_Context(++_connTag)); - if (shouldLog(DEBUG_CONNECTION)) { - LOG(debug, "connection established: tag=%u", _connTag); - } - return true; -} - -bool -TransportServer::InitChannel(FNET_Channel *channel, uint32_t pcode) -{ - channel->SetContext(channel); - channel->SetHandler(this); - if (shouldLog(DEBUG_CHANNEL)) { - LOG(debug, "new channel: id=%u, first pcode=%u", channel->GetID(), pcode); - } - return true; -} - -void -TransportServer::Run(FastOS_ThreadInterface *, void *) -{ - _dispatchTask.ScheduleNow(); - _ready = true; - _transport.Main(); // <- transport event loop - _dispatchTask.Kill(); - _listenTask.Kill(); - discardRequests(); -} - -bool -TransportServer::updateListen() -{ - bool doListen = _doListen; - if (doListen) { - if (_listener == nullptr) { // start listening - _listener = _transport.Listen(_listenSpec.c_str(), &PacketStreamer::Instance, this); - if (_listener == nullptr) { - LOG(error, "Could not bind fnet transport socket to %s", _listenSpec.c_str()); - _failed = true; - return false; - } - } - } else { - if (_listener != nullptr) { // stop listening - _transport.Close(_listener); // async close - _listener->SubRef(); - _listener = nullptr; - // also close client connections - std::set<FNET_Channel*>::iterator it = _clients.begin(); - for (; it != _clients.end(); ++it) { - _transport.Close((*it)->GetConnection()); // async close - } - } - } - return true; -} - -void -TransportServer::dispatchRequests() -{ - while (!_pending.empty()) { - Handler *h = _pending.front(); - _pending.pop(); - h->start(); - } -} - -void -TransportServer::discardRequests() -{ - while (!_pending.empty()) { - Handler *h = _pending.front(); - _pending.pop(); - delete h; - } -} - -void -TransportServer::logPacket(vespalib::stringref msg, FNET_Packet *p, FNET_Channel *ch, FNET_Connection *conn) -{ - uint32_t chid = -1; - uint32_t conntag = -1; - vespalib::string str; - if (ch != 0) { - chid = ch->GetID(); - conntag = ch->GetConnection()->GetContext()._value.INT; - } else if (conn != 0) { - conntag = conn->GetContext()._value.INT; - } - search::fs4transport::FS4Packet *fs4p = dynamic_cast<search::fs4transport::FS4Packet*>(p); - if (fs4p != 0) { - str = fs4p->toString(0); - } else { - str = vespalib::make_string("packet { pcode=%u }", p->GetPCODE()); - } - LOG(debug, "%s (chid=%u, conn=%u):\n%s", msg.data(), chid, conntag, str.c_str()); -} - -void -TransportServer::updateQueryMetrics(double latency_s) -{ - vespalib::LockGuard guard(_metrics.updateLock); - _metrics.query.count.inc(); - _metrics.query.latency.set(latency_s); -} - -void -TransportServer::updateDocsumMetrics(double latency_s, uint32_t numDocs) -{ - vespalib::LockGuard guard(_metrics.updateLock); - _metrics.docsum.count.inc(); - _metrics.docsum.docs.inc(numDocs); - _metrics.docsum.latency.set(latency_s); -} - -//----------------------------------------------------------------------------- - -bool -TransportServer::shouldLog(uint32_t msgType) { - return (((msgType & _debugMask) != 0) - && ((msgType != DEBUG_MONITOR && LOG_WOULD_LOG(debug)) || - (msgType == DEBUG_MONITOR && LOG_WOULD_LOG(spam)))); -} - -TransportServer::TransportServer(SearchServer &searchServer, - DocsumServer &docsumServer, - MonitorServer &monitorServer, - int port, uint32_t debugMask) - : _searchServer(searchServer), - _docsumServer(docsumServer), - _monitorServer(monitorServer), - _transport(std::make_shared<vespalib::NullCryptoEngine>(), 1), // disable encryption - _ready(false), - _failed(false), - _doListen(true), - _threadPool(256 * 1024), - _listenSpec(), - _listener(0), - _clients(), - _pending(), - _dispatchTask(*this), - _listenTask(*this), - _connTag(0), - _debugMask(debugMask), - _metrics() -{ - _listenSpec = vespalib::make_string("tcp/%d", port); -} - -bool -TransportServer::start() -{ - if (!updateListen()) { - return false; - } - if (_threadPool.NewThread(this) == 0) { - LOG(error, "Could not start internal transport thread"); - _failed = true; - return false; - } - return true; -} - -int -TransportServer::getListenPort() -{ - struct Cmd : public FNET_IExecutable { - TransportServer &server; - vespalib::Gate done; - int port; - Cmd(TransportServer &s) : server(s), done(), port(-1) {} - void execute() override { - if (server._listener != 0) { - port = server._listener->GetPortNumber(); - } - done.countDown(); - } - }; - Cmd cmd(*this); - if (_transport.execute(&cmd)) { - cmd.done.await(); - } - return cmd.port; -}; - -TransportServer::~TransportServer() -{ - shutDown(); // ensure shutdown - if (_listener != 0) { - _listener->SubRef(); - _listener = 0; - } -} - -} - diff --git a/searchlib/src/vespa/searchlib/engine/transportserver.h b/searchlib/src/vespa/searchlib/engine/transportserver.h deleted file mode 100644 index 60bcc79a7fe..00000000000 --- a/searchlib/src/vespa/searchlib/engine/transportserver.h +++ /dev/null @@ -1,325 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "transport_metrics.h" -#include "searchapi.h" -#include "docsumapi.h" -#include "monitorapi.h" -#include <vespa/searchlib/common/packets.h> -#include <vespa/fnet/iserveradapter.h> -#include <vespa/fnet/ipackethandler.h> -#include <vespa/fnet/task.h> -#include <vespa/fnet/transport.h> -#include <vespa/vespalib/util/sync.h> -#include <vespa/fastos/thread.h> -#include <set> -#include <queue> - -namespace search::engine { - -/** - * Common transport server implementation interacting with the - * underlying search engine using the common search api. This - * implementation has less optimization tricks compared to the - * previous ones being integrated into specific applications. - **/ -class TransportServer : public FastOS_Runnable, - public FNET_IServerAdapter, - public FNET_IPacketHandler -{ -private: - TransportServer(const TransportServer &); - TransportServer &operator=(const TransportServer &); - - /** - * Task used to update listen status - **/ - struct ListenTask : public FNET_Task - { - TransportServer &parent; - ListenTask(TransportServer &p) : FNET_Task(p._transport.GetScheduler()), parent(p) {} - void PerformTask() override { parent.updateListen(); } - }; - - /** - * Task used to dispatch incoming requests in an untangled way - * (aka not in the packet callback). - **/ - struct DispatchTask : public FNET_Task - { - TransportServer &parent; - DispatchTask(TransportServer &p) : FNET_Task(p._transport.GetScheduler()), parent(p) {} - void PerformTask() override { - parent.dispatchRequests(); - ScheduleNow(); // run each tick - } - }; - - struct Handler; - - SearchServer &_searchServer; - DocsumServer &_docsumServer; - MonitorServer &_monitorServer; - FNET_Transport _transport; - bool _ready; // flag indicating initial readyness - bool _failed; // flag indicating a critical failure - bool _doListen; // flag telling us to accept requests or not - FastOS_ThreadPool _threadPool; // thread pool owning transport thread - vespalib::string _listenSpec; // where to listen; FNET connect spec - FNET_Connector *_listener; // object accepting incoming connections - std::set<FNET_Channel*> _clients; // the admin channel of all client connections - std::queue<Handler*> _pending; // queue of incoming requests not yet started - DispatchTask _dispatchTask; // task used to dispatch incoming requests - ListenTask _listenTask; // task used to update listen status - uint32_t _connTag; // sequential number used to tag connections - uint32_t _debugMask; // enable more debug logging with this - TransportMetrics _metrics; // metrics for this transport server - - /** - * Toplevel class used to wrap incoming requests. Actual objects - * are used both to delay starting the request until we are not in - * the packet delivery callback and also as the callback target - * used by the underlying api objects to notify completion of - * individual requests. - **/ - struct Handler - { - TransportServer &parent; - uint32_t _debugMask; - Handler(TransportServer &p) : parent(p), _debugMask(p._debugMask) {} - bool shouldLog(uint32_t msgType) { return parent.shouldLog(msgType); } // possible thread issue - virtual void start() = 0; - virtual ~Handler() {} - private: - Handler(const Handler &rhs); - Handler &operator=(const Handler &rhs); - }; - - /** - * Wrapper for search requests - **/ - struct SearchHandler : public Handler, - public SearchClient - { - SearchRequest::Source request; - FNET_Channel *channel; - uint32_t clientCnt; - - SearchHandler(TransportServer &p, SearchRequest::Source req, FNET_Channel *ch, uint32_t cnt) - : Handler(p), request(std::move(req)), channel(ch), clientCnt(cnt) {} - void start() override; - void searchDone(SearchReply::UP reply) override; - ~SearchHandler(); - }; - - /** - * Wrapper for docsum requests - **/ - struct DocsumHandler : public Handler, - public DocsumClient - { - DocsumRequest::Source request; - FNET_Channel *channel; - - DocsumHandler(TransportServer &p, DocsumRequest::Source req, FNET_Channel *ch) - : Handler(p), request(std::move(req)), channel(ch) {} - void start() override; - void getDocsumsDone(DocsumReply::UP reply) override; - ~DocsumHandler(); - }; - - /** - * Wrapper for monitor requests - **/ - struct MonitorHandler : public Handler, - public MonitorClient - { - MonitorRequest::UP request; - FNET_Connection *connection; - - MonitorHandler(TransportServer &p, MonitorRequest::UP req, FNET_Connection *conn) - : Handler(p), request(std::move(req)), connection(conn) {} - void start() override; - void pingDone(MonitorReply::UP reply) override; - ~MonitorHandler(); - }; - - // handle incoming network packets - HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override; - - // set up admin channel for new clients - bool InitAdminChannel(FNET_Channel *channel) override; - - // set up channel for individual request - bool InitChannel(FNET_Channel *channel, uint32_t pcode) override; - - // entry point for thread running transport thread - void Run(FastOS_ThreadInterface *thisThread, void *arg) override; - - // update listen status - bool updateListen(); - - // dispatch incoming requests - void dispatchRequests(); - - // discard any pending requests during shutdown - void discardRequests(); - - // convenience method used to log packets - static void logPacket(vespalib::stringref msg, FNET_Packet *p, FNET_Channel *ch, FNET_Connection *conn); - - void updateQueryMetrics(double latency_s); - void updateDocsumMetrics(double latency_s, uint32_t numDocs); - -public: - /** - * Convenience typedes. - */ - typedef std::unique_ptr<TransportServer> UP; - typedef std::shared_ptr<TransportServer> SP; - - /** no debug logging flags set **/ - static constexpr uint32_t DEBUG_NONE = 0x00000000; - - /** log connect disconnect from clients **/ - static constexpr uint32_t DEBUG_CONNECTION = 0x00000001; - - /** log channel open events **/ - static constexpr uint32_t DEBUG_CHANNEL = 0x00000002; - - /** log search related packets **/ - static constexpr uint32_t DEBUG_SEARCH = 0x00000004; - - /** log docsum related packets **/ - static constexpr uint32_t DEBUG_DOCSUM = 0x00000008; - - /** log monitor related packets **/ - static constexpr uint32_t DEBUG_MONITOR = 0x00000010; - - /** log unhandled packets **/ - static constexpr uint32_t DEBUG_UNHANDLED = 0x00000020; - - /** all debug logging flags set **/ - static constexpr uint32_t DEBUG_ALL = 0x0000003f; - - /** - * Check if we should log a debug message - * - * @return true if we should log a message for this event - * @param msgType the event we might want to log - **/ - bool shouldLog(uint32_t msgType); - - /** - * Create a transport server based on the given underlying api - * objects. An appropriate debug mask can be made by or'ing - * together the appropriate DEBUG_ constants defined in this - * class. - * - * @param searchServer search api - * @param docsumServer docsum api - * @param monitorServer monitor api - * @param port listen port. - * @param debugMask mask indicating what information should be logged as debug messages. - **/ - TransportServer(SearchServer &searchServer, - DocsumServer &docsumServer, - MonitorServer &monitorServer, - int port, uint32_t debugMask = DEBUG_NONE); - - /** - * Obtain the metrics used by this transport server. - * - * @return internal metrics - **/ - TransportMetrics &getMetrics() { return _metrics; } - - /** - * Obtain the listen spec used by this transport server - * - * @return listen spec - **/ - const vespalib::string &getListenSpec() const { return _listenSpec; } - - /** - * Start this server. - * - * @return success(true)/failure(false) - **/ - bool start(); - - /** - * Check for initial readyness. - * - * @return true if we are ready. - **/ - bool isReady() const { return _ready; } - - /** - * Check if a critical error has occurred. - * - * @return true if something bad has happened. - **/ - bool isFailed() const { return _failed; } - - /** - * Get a reference to the internal fnet scheduler. - * - * @return fnet scheduler - **/ - FNET_Scheduler &getScheduler() { return *(_transport.GetScheduler()); } - - /** - * Set a flag indicating whether we should accept incoming - * requests or not. Setting the flag to false will make this - * server unavailable to any client application. - * - * @param listen flag indicating if we should listen - **/ - void setListen(bool listen) { - _doListen = listen; - _listenTask.ScheduleNow(); - } - - /** - * Check which port this server is currently listening to. This - * method is useful when using automatically allocated port - * numbers (listening to port 0). - * - * @return current listening port number, -1 if not listening. - **/ - int getListenPort(); - - /** - * Enable or disable nagles algorithm. - * - * @param noDelay set to true to disable nagles algorithm - **/ - void setTCPNoDelay(bool noDelay) { _transport.SetTCPNoDelay(noDelay); } - - /** - * Set a limit on how long a connection may be idle before closing it. - * - * @param millisecs max idle time in milliseconds - **/ - void setIdleTimeout(double millisecs) { _transport.SetIOCTimeOut((uint32_t) millisecs); } - - /** - * Shut down this component. This method will block until the - * transport server has been shut down. After this method returns, - * no new requests will be generated by this component. - **/ - void shutDown() { - _transport.ShutDown(false); - _threadPool.Close(); - } - - /** - * Destructor will perform shutdown if needed. - **/ - ~TransportServer(); -}; - -} - |