summaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-09-27 08:40:16 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-09-27 08:40:16 +0000
commit8aead1fd3a4cee143ba1c915c7180255fbd0446b (patch)
tree205fa77ac51ee5654ad0d184a8b45879a232cd5b /searchlib
parent2954b2689e7c973ba90c83430406a17b9b5b90fa (diff)
Remove TransportServer
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/CMakeLists.txt1
-rw-r--r--searchlib/src/tests/engine/transportserver/.gitignore5
-rw-r--r--searchlib/src/tests/engine/transportserver/CMakeLists.txt12
-rw-r--r--searchlib/src/tests/engine/transportserver/transportserver_test.cpp232
-rw-r--r--searchlib/src/vespa/searchlib/engine/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/engine/transportserver.cpp464
-rw-r--r--searchlib/src/vespa/searchlib/engine/transportserver.h325
7 files changed, 0 insertions, 1040 deletions
diff --git a/searchlib/CMakeLists.txt b/searchlib/CMakeLists.txt
index 449580e577b..af16bd08c7a 100644
--- a/searchlib/CMakeLists.txt
+++ b/searchlib/CMakeLists.txt
@@ -124,7 +124,6 @@ vespa_define_module(
src/tests/engine/proto_converter
src/tests/engine/proto_rpc_adapter
src/tests/engine/searchapi
- src/tests/engine/transportserver
src/tests/expression/attributenode
src/tests/features
src/tests/features/beta
diff --git a/searchlib/src/tests/engine/transportserver/.gitignore b/searchlib/src/tests/engine/transportserver/.gitignore
deleted file mode 100644
index 09d836e0004..00000000000
--- a/searchlib/src/tests/engine/transportserver/.gitignore
+++ /dev/null
@@ -1,5 +0,0 @@
-.depend
-Makefile
-transportserver_test
-vlog.txt
-searchlib_transportserver_test_app
diff --git a/searchlib/src/tests/engine/transportserver/CMakeLists.txt b/searchlib/src/tests/engine/transportserver/CMakeLists.txt
deleted file mode 100644
index b4f94884ce4..00000000000
--- a/searchlib/src/tests/engine/transportserver/CMakeLists.txt
+++ /dev/null
@@ -1,12 +0,0 @@
-# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-vespa_add_executable(searchlib_transportserver_test_app TEST
- SOURCES
- transportserver_test.cpp
- DEPENDS
- searchlib
-)
-vespa_add_test(
- NAME searchlib_transportserver_test_app
- COMMAND searchlib_transportserver_test_app
- ENVIRONMENT "VESPA_LOG_TARGET=file:vlog.txt;VESPA_LOG_LEVEL=\"all -spam\""
-)
diff --git a/searchlib/src/tests/engine/transportserver/transportserver_test.cpp b/searchlib/src/tests/engine/transportserver/transportserver_test.cpp
deleted file mode 100644
index baa581c65f9..00000000000
--- a/searchlib/src/tests/engine/transportserver/transportserver_test.cpp
+++ /dev/null
@@ -1,232 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/document/base/documentid.h>
-#include <vespa/searchlib/common/packets.h>
-#include <vespa/searchlib/engine/transportserver.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fnet/fnet.h>
-#include <vespa/searchlib/engine/errorcodes.h>
-#include <thread>
-#include <chrono>
-#include <vespa/log/log.h>
-LOG_SETUP("transportserver_test");
-
-using namespace document;
-using namespace vespalib;
-using namespace search::engine;
-using namespace search::fs4transport;
-
-class SyncServer : public search::engine::SearchServer,
- public search::engine::DocsumServer,
- public search::engine::MonitorServer
-{
-private:
- virtual SearchReply::UP search(SearchRequest::Source request, SearchClient &client) override;
- virtual DocsumReply::UP getDocsums(DocsumRequest::Source request, DocsumClient &client) override;
- virtual MonitorReply::UP ping(MonitorRequest::UP request, MonitorClient &client) override;
-
- SyncServer(const SyncServer &);
- SyncServer &operator=(const SyncServer &);
-public:
- SyncServer() {}
- virtual ~SyncServer() {}
-};
-
-SearchReply::UP
-SyncServer::search(SearchRequest::Source request, SearchClient &)
-{
- // fastos should use steady clock (this may fail)
- std::this_thread::sleep_for(std::chrono::milliseconds(20));
- fastos::TimeStamp my_time = fastos::ClockSystem::now();
- std::this_thread::sleep_for(std::chrono::milliseconds(20));
- EXPECT_GREATER_EQUAL(my_time.val(), request->getStartTime());
- const SearchRequest &req = *request.get();
- SearchReply::UP reply(new SearchReply());
- SearchReply &ret = *reply;
- ret.request = request.release();
- LOG(info, "responding to search request...");
- ret.offset = req.offset;
- return reply;
-}
-
-DocsumReply::UP
-SyncServer::getDocsums(DocsumRequest::Source request, DocsumClient &)
-{
- // fastos should use steady clock (this may fail)
- std::this_thread::sleep_for(std::chrono::milliseconds(20));
- fastos::TimeStamp my_time = fastos::ClockSystem::now();
- std::this_thread::sleep_for(std::chrono::milliseconds(20));
- EXPECT_GREATER_EQUAL(my_time.val(), request->getStartTime());
- DocsumReply::UP reply(new DocsumReply());
- DocsumReply &ret = *reply;
- ret.request = request.release();
- LOG(info, "responding to docsum request...");
- ret.docsums.resize(1);
- ret.docsums[0].setData("data", strlen("data"));
- ret.docsums[0].gid = DocumentId(vespalib::make_string("id:ns:type::100")).getGlobalId();
- return reply;
-}
-
-MonitorReply::UP
-SyncServer::ping(MonitorRequest::UP request, MonitorClient &)
-{
- MonitorRequest &req = *request;
- MonitorReply::UP reply(new MonitorReply());
- MonitorReply &ret = *reply;
- LOG(info, "responding to monitor request...");
- ret.timestamp = req.flags;
- return reply;
-}
-
-TEST("transportserver") {
- {
- SyncServer server;
- TransportServer transport(server, server, server, 0,
- TransportServer::DEBUG_ALL);
- ASSERT_TRUE(transport.start());
- int port = transport.getListenPort();
- ASSERT_TRUE(port > 0);
- {
- FNET_Context ctx;
- FastOS_ThreadPool pool(128 * 1024);
- FNET_Transport client;
- ASSERT_TRUE(client.Start(&pool));
-
- FNET_PacketQueue adminQ;
- FNET_Connection *conn = client.Connect(make_string("tcp/localhost:%d", port).c_str(),
- &FS4PersistentPacketStreamer::Instance, &adminQ);
- ASSERT_TRUE(conn != 0);
- {
- FS4Packet_MONITORQUERYX *mq = new FS4Packet_MONITORQUERYX();
- mq->_qflags = 30;
- mq->_features |= MQF_QFLAGS;
- conn->PostPacket(mq, FNET_NOID);
- FNET_Packet *p = adminQ.DequeuePacket(60000, &ctx);
- ASSERT_TRUE(p != 0);
- ASSERT_TRUE(p->GetPCODE() == PCODE_MONITORRESULTX);
- FS4Packet_MONITORRESULTX *r = (FS4Packet_MONITORRESULTX*)p;
- EXPECT_EQUAL(r->_timestamp, 30u);
- p->Free();
- }
- {
- FNET_PacketQueue q;
- FNET_Channel *ch = conn->OpenChannel(&q, FNET_Context());
- FS4Packet_QUERYX *qx = new FS4Packet_QUERYX();
- qx->_features |= QF_PARSEDQUERY;
- qx->_offset = 100;
- ch->Send(qx);
- FNET_Packet *p = q.DequeuePacket(60000, &ctx);
- ASSERT_TRUE(p != 0);
- ASSERT_TRUE(p->GetPCODE() == PCODE_QUERYRESULTX);
- FS4Packet_QUERYRESULTX *r = (FS4Packet_QUERYRESULTX*)p;
- EXPECT_EQUAL(r->_offset, 100u);
- p->Free();
- ch->CloseAndFree();
- }
- {
- FS4Packet_MONITORQUERYX *mq = new FS4Packet_MONITORQUERYX();
- mq->_qflags = 40;
- mq->_features |= MQF_QFLAGS;
- conn->PostPacket(mq, FNET_NOID);
- FNET_Packet *p = adminQ.DequeuePacket(60000, &ctx);
- ASSERT_TRUE(p != 0);
- ASSERT_TRUE(p->GetPCODE() == PCODE_MONITORRESULTX);
- FS4Packet_MONITORRESULTX *r = (FS4Packet_MONITORRESULTX*)p;
- EXPECT_EQUAL(r->_timestamp, 40u);
- p->Free();
- }
- {
- FNET_PacketQueue q;
- FNET_Channel *ch = conn->OpenChannel(&q, FNET_Context());
- FS4Packet_GETDOCSUMSX *qdx = new FS4Packet_GETDOCSUMSX();
- ch->Send(qdx);
- FNET_Packet *p = q.DequeuePacket(60000, &ctx);
- ASSERT_TRUE(p != 0);
- ASSERT_TRUE(p->GetPCODE() == PCODE_DOCSUM);
- FS4Packet_DOCSUM *r = (FS4Packet_DOCSUM*)p;
- EXPECT_EQUAL(r->getGid(), DocumentId("id:ns:type::100").getGlobalId());
- p->Free();
- p = q.DequeuePacket(60000, &ctx);
- ASSERT_TRUE(p != 0);
- ASSERT_TRUE(p->GetPCODE() == PCODE_EOL);
- p->Free();
- ch->CloseAndFree();
- }
- {
- FS4Packet_MONITORQUERYX *mq = new FS4Packet_MONITORQUERYX();
- mq->_qflags = 50;
- mq->_features |= MQF_QFLAGS;
- conn->PostPacket(mq, FNET_NOID);
- FNET_Packet *p = adminQ.DequeuePacket(60000, &ctx);
- ASSERT_TRUE(p != 0);
- ASSERT_TRUE(p->GetPCODE() == PCODE_MONITORRESULTX);
- FS4Packet_MONITORRESULTX *r = (FS4Packet_MONITORRESULTX*)p;
- EXPECT_EQUAL(r->_timestamp, 50u);
- p->Free();
- }
- // shut down client
- conn->CloseAdminChannel();
- client.Close(conn);
- conn->SubRef();
- client.sync();
- client.ShutDown(true);
- pool.Close();
- }
-
- }
-}
-
-void printError(ErrorCode ecode) {
- fprintf(stderr, "error code %u: '%s'\n", ecode, getStringFromErrorCode(ecode));
-}
-
-TEST("print errors") {
- printError(ECODE_NO_ERROR);
- printError(ECODE_GENERAL_ERROR);
- printError(ECODE_QUERY_PARSE_ERROR);
- printError(ECODE_ALL_PARTITIONS_DOWN);
- printError(ECODE_ILLEGAL_DATASET);
- printError(ECODE_OVERLOADED);
- printError(ECODE_NOT_IMPLEMENTED);
- printError(ECODE_QUERY_NOT_ALLOWED);
- printError(ECODE_TIMEOUT);
-}
-
-TEST("test SearchReply::Coverage") {
- SearchReply::Coverage c;
- EXPECT_EQUAL(0u, c.getActive());
- EXPECT_EQUAL(0u, c.getSoonActive());
- EXPECT_EQUAL(0u, c.getCovered());
- EXPECT_EQUAL(0u, c.getDegradeReason());
-}
-
-TEST("test SearchReply::Coverage(7)") {
- SearchReply::Coverage c(7);
- EXPECT_EQUAL(7u, c.getActive());
- EXPECT_EQUAL(7u, c.getSoonActive());
- EXPECT_EQUAL(7u, c.getCovered());
- EXPECT_EQUAL(0u, c.getDegradeReason());
-}
-
-TEST("test SearchReply::Coverage(7, 19)") {
- SearchReply::Coverage c(19, 7);
- EXPECT_EQUAL(19u, c.getActive());
- EXPECT_EQUAL(19u, c.getSoonActive());
- EXPECT_EQUAL(7u, c.getCovered());
- EXPECT_EQUAL(0u, c.getDegradeReason());
-}
-
-TEST("test SearchReply::Coverage set and get") {
- SearchReply::Coverage c;
- EXPECT_EQUAL(7u, c.setActive(7).getActive());
- EXPECT_EQUAL(9u, c.setSoonActive(9).getSoonActive());
- EXPECT_EQUAL(19u, c.setCovered(19).getCovered());
- EXPECT_EQUAL(5u, c.setDegradeReason(5).getDegradeReason());
- EXPECT_EQUAL(1u, SearchReply::Coverage().degradeMatchPhase().getDegradeReason());
- EXPECT_EQUAL(2u, SearchReply::Coverage().degradeTimeout().getDegradeReason());
- EXPECT_EQUAL(4u, SearchReply::Coverage().degradeAdaptiveTimeout().getDegradeReason());
- EXPECT_EQUAL(7u, SearchReply::Coverage().degradeAdaptiveTimeout().degradeTimeout().degradeMatchPhase().getDegradeReason());
-}
-
-TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchlib/src/vespa/searchlib/engine/CMakeLists.txt b/searchlib/src/vespa/searchlib/engine/CMakeLists.txt
index 36ff4e12108..303de79ae49 100644
--- a/searchlib/src/vespa/searchlib/engine/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/engine/CMakeLists.txt
@@ -28,7 +28,6 @@ vespa_add_library(searchlib_engine OBJECT
searchrequest.cpp
trace.cpp
transport_metrics.cpp
- transportserver.cpp
${searchlib_engine_PROTOBUF_SRCS}
DEPENDS
)
diff --git a/searchlib/src/vespa/searchlib/engine/transportserver.cpp b/searchlib/src/vespa/searchlib/engine/transportserver.cpp
deleted file mode 100644
index d14735b7770..00000000000
--- a/searchlib/src/vespa/searchlib/engine/transportserver.cpp
+++ /dev/null
@@ -1,464 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "packetconverter.h"
-#include "transportserver.h"
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fnet/channel.h>
-#include <vespa/fnet/connection.h>
-#include <vespa/fnet/connector.h>
-#include <vespa/fnet/iexecutable.h>
-#include <vespa/vespalib/net/crypto_engine.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".engine.transportserver");
-
-namespace search::engine {
-
-namespace {
-
-struct SearchRequestDecoder : SearchRequest::Source::Decoder {
- PacketConverter::QUERYX *packet;
- RelativeTime relative_time;
- SearchRequestDecoder(PacketConverter::QUERYX *qx)
- : packet(qx), relative_time(std::make_unique<FastosClock>()) {}
- std::unique_ptr<SearchRequest> decode() override {
- auto req = std::make_unique<SearchRequest>(std::move(relative_time));
- PacketConverter::toSearchRequest(*packet, *req);
- return req;
- }
- ~SearchRequestDecoder() override { packet->Free(); }
-};
-
-std::unique_ptr<SearchRequest::Source::Decoder> search_request_decoder(PacketConverter::QUERYX *qx) {
- return std::make_unique<SearchRequestDecoder>(qx);
-}
-
-struct DocsumRequestDecoder : DocsumRequest::Source::Decoder {
- PacketConverter::GETDOCSUMSX *packet;
- RelativeTime relative_time;
- DocsumRequestDecoder(PacketConverter::GETDOCSUMSX *gdx)
- : packet(gdx), relative_time(std::make_unique<FastosClock>()) {}
- std::unique_ptr<DocsumRequest> decode() override {
- auto req = std::make_unique<DocsumRequest>(std::move(relative_time), false);
- PacketConverter::toDocsumRequest(*packet, *req);
- return req;
- }
- ~DocsumRequestDecoder() override { packet->Free(); }
-};
-
-std::unique_ptr<DocsumRequest::Source::Decoder> docsum_request_decoder(PacketConverter::GETDOCSUMSX *gdx) {
- return std::make_unique<DocsumRequestDecoder>(gdx);
-}
-
-}
-
-//-----------------------------------------------------------------------------
-
-typedef search::fs4transport::FS4PersistentPacketStreamer PacketStreamer;
-
-//-----------------------------------------------------------------------------
-
-constexpr uint32_t TransportServer::DEBUG_NONE;
-constexpr uint32_t TransportServer::DEBUG_CONNECTION;
-constexpr uint32_t TransportServer::DEBUG_CHANNEL;
-constexpr uint32_t TransportServer::DEBUG_SEARCH;
-constexpr uint32_t TransportServer::DEBUG_DOCSUM;
-constexpr uint32_t TransportServer::DEBUG_MONITOR;
-constexpr uint32_t TransportServer::DEBUG_UNHANDLED;
-constexpr uint32_t TransportServer::DEBUG_ALL;
-
-void
-TransportServer::SearchHandler::start()
-{
- SearchReply::UP reply = parent._searchServer.search(std::move(request), *this);
- if (reply) {
- searchDone(std::move(reply));
- }
-}
-
-void
-TransportServer::SearchHandler::searchDone(SearchReply::UP reply)
-{
- if (reply) {
- const SearchReply &r = *reply;
- if (r.valid) {
- if (r.errorCode == 0) {
- PacketConverter::QUERYRESULTX *p = new PacketConverter::QUERYRESULTX();
- PacketConverter::fromSearchReply(r, *p);
- if (shouldLog(DEBUG_SEARCH)) {
- logPacket("outgoing packet", p, channel, 0);
- }
- channel->Send(p);
- } else {
- PacketConverter::ERROR *p = new PacketConverter::ERROR();
- p->_errorCode = r.errorCode;
- p->setErrorMessage(r.errorMessage);
- if (shouldLog(DEBUG_SEARCH)) {
- logPacket("outgoing packet", p, channel, 0);
- }
- channel->Send(p);
- }
- if (r.request) {
- parent.updateQueryMetrics(r.request->getTimeUsed().sec()); // possible thread issue
- }
- } else {
- PacketConverter::EOL *p = new PacketConverter::EOL();
- if (shouldLog(DEBUG_SEARCH)) {
- logPacket("outgoing packet", p, channel, 0);
- }
- channel->Send(p);
- }
- } else {
- LOG(warning, "got <null> search reply from back-end");
- }
- delete this; // we are done
-}
-
-TransportServer::SearchHandler::~SearchHandler()
-{
- channel->Free();
-}
-
-//-----------------------------------------------------------------------------
-
-void
-TransportServer::DocsumHandler::start()
-{
- DocsumReply::UP reply = parent._docsumServer.getDocsums(std::move(request), *this);
- if (reply) {
- getDocsumsDone(std::move(reply));
- }
-}
-
-void
-TransportServer::DocsumHandler::getDocsumsDone(DocsumReply::UP reply)
-{
- if (reply) {
- const DocsumReply &r = *reply;
- for (uint32_t i = 0; i < r.docsums.size(); ++i) {
- PacketConverter::DOCSUM *p = new PacketConverter::DOCSUM();
- PacketConverter::fromDocsumReplyElement(r.docsums[i], *p);
- if (shouldLog(DEBUG_DOCSUM)) {
- logPacket("outgoing packet", p, channel, 0);
- }
- channel->Send(p);
- }
- PacketConverter::EOL *p = new PacketConverter::EOL();
- if (shouldLog(DEBUG_DOCSUM)) {
- logPacket("outgoing packet", p, channel, 0);
- }
- channel->Send(p);
- if (r.request) {
- parent.updateDocsumMetrics(r.request->getTimeUsed().sec(), r.docsums.size());
- }
- } else {
- LOG(warning, "got <null> docsum reply from back-end");
- }
- delete this; // we are done
-}
-
-TransportServer::DocsumHandler::~DocsumHandler()
-{
- channel->Free();
-}
-
-//-----------------------------------------------------------------------------
-
-void
-TransportServer::MonitorHandler::start()
-{
- MonitorReply::UP reply = parent._monitorServer.ping(std::move(request), *this);
- if (reply) {
- pingDone(std::move(reply));
- }
-}
-
-void
-TransportServer::MonitorHandler::pingDone(MonitorReply::UP reply)
-{
- if (reply) {
- const MonitorReply &r = *reply;
- PacketConverter::MONITORRESULTX *p = new PacketConverter::MONITORRESULTX();
- PacketConverter::fromMonitorReply(r, *p);
- if (shouldLog(DEBUG_MONITOR)) {
- logPacket("outgoing packet", p, 0, connection);
- }
- connection->PostPacket(p, FNET_NOID);
- } else {
- LOG(warning, "got <null> monitor reply from back-end");
- }
- delete this; // we are done
-}
-
-TransportServer::MonitorHandler::~MonitorHandler()
-{
- connection->SubRef();
-}
-
-//-----------------------------------------------------------------------------
-
-FNET_IPacketHandler::HP_RetCode
-TransportServer::HandlePacket(FNET_Packet *packet, FNET_Context context)
-{
- uint32_t pcode = packet->GetPCODE();
- FNET_Channel *channel = context._value.CHANNEL;
- HP_RetCode rc = FNET_FREE_CHANNEL;
-
- if (channel->GetID() == FNET_NOID) { // admin packet
- if (packet->IsChannelLostCMD()) {
- _clients.erase(channel);
- if (shouldLog(DEBUG_CONNECTION)) {
- LOG(debug, "connection closed: tag=%u", channel->GetConnection()->GetContext()._value.INT);
- }
- } else if (pcode == search::fs4transport::PCODE_MONITORQUERYX) {
- const PacketConverter::MONITORQUERYX &mqx = static_cast<PacketConverter::MONITORQUERYX&>(*packet);
- if (shouldLog(DEBUG_MONITOR)) {
- logPacket("incoming packet", packet, channel, 0);
- }
- MonitorRequest::UP req(new MonitorRequest());
- PacketConverter::toMonitorRequest(mqx, *req);
- channel->GetConnection()->AddRef();
- _pending.push(new MonitorHandler(*this, std::move(req), channel->GetConnection()));
- rc = FNET_KEEP_CHANNEL;
- } else if (shouldLog(DEBUG_UNHANDLED)) {
- logPacket("unhandled packet", packet, channel, 0);
- }
- } else { // search/docsum request
- if (pcode == search::fs4transport::PCODE_QUERYX) {
- PacketConverter::QUERYX * qx = static_cast<PacketConverter::QUERYX *>(packet);
- if (shouldLog(DEBUG_SEARCH)) {
- logPacket("incoming packet", packet, channel, 0);
- }
- SearchRequest::Source req(search_request_decoder(qx));
- packet = nullptr;
- _pending.push(new SearchHandler(*this, std::move(req), channel, _clients.size()));
- rc = FNET_CLOSE_CHANNEL;
- } else if (pcode == search::fs4transport::PCODE_GETDOCSUMSX) {
- PacketConverter::GETDOCSUMSX * gdx = static_cast<PacketConverter::GETDOCSUMSX *>(packet);
- if (shouldLog(DEBUG_DOCSUM)) {
- logPacket("incoming packet", packet, channel, 0);
- }
- DocsumRequest::Source req(docsum_request_decoder(gdx));
- packet = nullptr;
- _pending.push(new DocsumHandler(*this, std::move(req), channel));
- rc = FNET_CLOSE_CHANNEL;
- } else if (shouldLog(DEBUG_UNHANDLED)) {
- logPacket("unhandled packet", packet, channel, 0);
- }
- }
- if (packet != nullptr) {
- packet->Free();
- }
- return rc;
-}
-
-bool
-TransportServer::InitAdminChannel(FNET_Channel *channel)
-{
- if (_listener == nullptr) {
- // handle race where we get an incoming connection and
- // disables listening at the 'same time'. Note that sync close
- // is only allowed in the InitAdminChannel method
- channel->GetConnection()->Close(); // sync close
- return false;
- }
- channel->SetContext(channel);
- channel->SetHandler(this);
- assert(_clients.count(channel) == 0);
- _clients.insert(channel);
- channel->GetConnection()->SetContext(FNET_Context(++_connTag));
- if (shouldLog(DEBUG_CONNECTION)) {
- LOG(debug, "connection established: tag=%u", _connTag);
- }
- return true;
-}
-
-bool
-TransportServer::InitChannel(FNET_Channel *channel, uint32_t pcode)
-{
- channel->SetContext(channel);
- channel->SetHandler(this);
- if (shouldLog(DEBUG_CHANNEL)) {
- LOG(debug, "new channel: id=%u, first pcode=%u", channel->GetID(), pcode);
- }
- return true;
-}
-
-void
-TransportServer::Run(FastOS_ThreadInterface *, void *)
-{
- _dispatchTask.ScheduleNow();
- _ready = true;
- _transport.Main(); // <- transport event loop
- _dispatchTask.Kill();
- _listenTask.Kill();
- discardRequests();
-}
-
-bool
-TransportServer::updateListen()
-{
- bool doListen = _doListen;
- if (doListen) {
- if (_listener == nullptr) { // start listening
- _listener = _transport.Listen(_listenSpec.c_str(), &PacketStreamer::Instance, this);
- if (_listener == nullptr) {
- LOG(error, "Could not bind fnet transport socket to %s", _listenSpec.c_str());
- _failed = true;
- return false;
- }
- }
- } else {
- if (_listener != nullptr) { // stop listening
- _transport.Close(_listener); // async close
- _listener->SubRef();
- _listener = nullptr;
- // also close client connections
- std::set<FNET_Channel*>::iterator it = _clients.begin();
- for (; it != _clients.end(); ++it) {
- _transport.Close((*it)->GetConnection()); // async close
- }
- }
- }
- return true;
-}
-
-void
-TransportServer::dispatchRequests()
-{
- while (!_pending.empty()) {
- Handler *h = _pending.front();
- _pending.pop();
- h->start();
- }
-}
-
-void
-TransportServer::discardRequests()
-{
- while (!_pending.empty()) {
- Handler *h = _pending.front();
- _pending.pop();
- delete h;
- }
-}
-
-void
-TransportServer::logPacket(vespalib::stringref msg, FNET_Packet *p, FNET_Channel *ch, FNET_Connection *conn)
-{
- uint32_t chid = -1;
- uint32_t conntag = -1;
- vespalib::string str;
- if (ch != 0) {
- chid = ch->GetID();
- conntag = ch->GetConnection()->GetContext()._value.INT;
- } else if (conn != 0) {
- conntag = conn->GetContext()._value.INT;
- }
- search::fs4transport::FS4Packet *fs4p = dynamic_cast<search::fs4transport::FS4Packet*>(p);
- if (fs4p != 0) {
- str = fs4p->toString(0);
- } else {
- str = vespalib::make_string("packet { pcode=%u }", p->GetPCODE());
- }
- LOG(debug, "%s (chid=%u, conn=%u):\n%s", msg.data(), chid, conntag, str.c_str());
-}
-
-void
-TransportServer::updateQueryMetrics(double latency_s)
-{
- vespalib::LockGuard guard(_metrics.updateLock);
- _metrics.query.count.inc();
- _metrics.query.latency.set(latency_s);
-}
-
-void
-TransportServer::updateDocsumMetrics(double latency_s, uint32_t numDocs)
-{
- vespalib::LockGuard guard(_metrics.updateLock);
- _metrics.docsum.count.inc();
- _metrics.docsum.docs.inc(numDocs);
- _metrics.docsum.latency.set(latency_s);
-}
-
-//-----------------------------------------------------------------------------
-
-bool
-TransportServer::shouldLog(uint32_t msgType) {
- return (((msgType & _debugMask) != 0)
- && ((msgType != DEBUG_MONITOR && LOG_WOULD_LOG(debug)) ||
- (msgType == DEBUG_MONITOR && LOG_WOULD_LOG(spam))));
-}
-
-TransportServer::TransportServer(SearchServer &searchServer,
- DocsumServer &docsumServer,
- MonitorServer &monitorServer,
- int port, uint32_t debugMask)
- : _searchServer(searchServer),
- _docsumServer(docsumServer),
- _monitorServer(monitorServer),
- _transport(std::make_shared<vespalib::NullCryptoEngine>(), 1), // disable encryption
- _ready(false),
- _failed(false),
- _doListen(true),
- _threadPool(256 * 1024),
- _listenSpec(),
- _listener(0),
- _clients(),
- _pending(),
- _dispatchTask(*this),
- _listenTask(*this),
- _connTag(0),
- _debugMask(debugMask),
- _metrics()
-{
- _listenSpec = vespalib::make_string("tcp/%d", port);
-}
-
-bool
-TransportServer::start()
-{
- if (!updateListen()) {
- return false;
- }
- if (_threadPool.NewThread(this) == 0) {
- LOG(error, "Could not start internal transport thread");
- _failed = true;
- return false;
- }
- return true;
-}
-
-int
-TransportServer::getListenPort()
-{
- struct Cmd : public FNET_IExecutable {
- TransportServer &server;
- vespalib::Gate done;
- int port;
- Cmd(TransportServer &s) : server(s), done(), port(-1) {}
- void execute() override {
- if (server._listener != 0) {
- port = server._listener->GetPortNumber();
- }
- done.countDown();
- }
- };
- Cmd cmd(*this);
- if (_transport.execute(&cmd)) {
- cmd.done.await();
- }
- return cmd.port;
-};
-
-TransportServer::~TransportServer()
-{
- shutDown(); // ensure shutdown
- if (_listener != 0) {
- _listener->SubRef();
- _listener = 0;
- }
-}
-
-}
-
diff --git a/searchlib/src/vespa/searchlib/engine/transportserver.h b/searchlib/src/vespa/searchlib/engine/transportserver.h
deleted file mode 100644
index 60bcc79a7fe..00000000000
--- a/searchlib/src/vespa/searchlib/engine/transportserver.h
+++ /dev/null
@@ -1,325 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "transport_metrics.h"
-#include "searchapi.h"
-#include "docsumapi.h"
-#include "monitorapi.h"
-#include <vespa/searchlib/common/packets.h>
-#include <vespa/fnet/iserveradapter.h>
-#include <vespa/fnet/ipackethandler.h>
-#include <vespa/fnet/task.h>
-#include <vespa/fnet/transport.h>
-#include <vespa/vespalib/util/sync.h>
-#include <vespa/fastos/thread.h>
-#include <set>
-#include <queue>
-
-namespace search::engine {
-
-/**
- * Common transport server implementation interacting with the
- * underlying search engine using the common search api. This
- * implementation has less optimization tricks compared to the
- * previous ones being integrated into specific applications.
- **/
-class TransportServer : public FastOS_Runnable,
- public FNET_IServerAdapter,
- public FNET_IPacketHandler
-{
-private:
- TransportServer(const TransportServer &);
- TransportServer &operator=(const TransportServer &);
-
- /**
- * Task used to update listen status
- **/
- struct ListenTask : public FNET_Task
- {
- TransportServer &parent;
- ListenTask(TransportServer &p) : FNET_Task(p._transport.GetScheduler()), parent(p) {}
- void PerformTask() override { parent.updateListen(); }
- };
-
- /**
- * Task used to dispatch incoming requests in an untangled way
- * (aka not in the packet callback).
- **/
- struct DispatchTask : public FNET_Task
- {
- TransportServer &parent;
- DispatchTask(TransportServer &p) : FNET_Task(p._transport.GetScheduler()), parent(p) {}
- void PerformTask() override {
- parent.dispatchRequests();
- ScheduleNow(); // run each tick
- }
- };
-
- struct Handler;
-
- SearchServer &_searchServer;
- DocsumServer &_docsumServer;
- MonitorServer &_monitorServer;
- FNET_Transport _transport;
- bool _ready; // flag indicating initial readyness
- bool _failed; // flag indicating a critical failure
- bool _doListen; // flag telling us to accept requests or not
- FastOS_ThreadPool _threadPool; // thread pool owning transport thread
- vespalib::string _listenSpec; // where to listen; FNET connect spec
- FNET_Connector *_listener; // object accepting incoming connections
- std::set<FNET_Channel*> _clients; // the admin channel of all client connections
- std::queue<Handler*> _pending; // queue of incoming requests not yet started
- DispatchTask _dispatchTask; // task used to dispatch incoming requests
- ListenTask _listenTask; // task used to update listen status
- uint32_t _connTag; // sequential number used to tag connections
- uint32_t _debugMask; // enable more debug logging with this
- TransportMetrics _metrics; // metrics for this transport server
-
- /**
- * Toplevel class used to wrap incoming requests. Actual objects
- * are used both to delay starting the request until we are not in
- * the packet delivery callback and also as the callback target
- * used by the underlying api objects to notify completion of
- * individual requests.
- **/
- struct Handler
- {
- TransportServer &parent;
- uint32_t _debugMask;
- Handler(TransportServer &p) : parent(p), _debugMask(p._debugMask) {}
- bool shouldLog(uint32_t msgType) { return parent.shouldLog(msgType); } // possible thread issue
- virtual void start() = 0;
- virtual ~Handler() {}
- private:
- Handler(const Handler &rhs);
- Handler &operator=(const Handler &rhs);
- };
-
- /**
- * Wrapper for search requests
- **/
- struct SearchHandler : public Handler,
- public SearchClient
- {
- SearchRequest::Source request;
- FNET_Channel *channel;
- uint32_t clientCnt;
-
- SearchHandler(TransportServer &p, SearchRequest::Source req, FNET_Channel *ch, uint32_t cnt)
- : Handler(p), request(std::move(req)), channel(ch), clientCnt(cnt) {}
- void start() override;
- void searchDone(SearchReply::UP reply) override;
- ~SearchHandler();
- };
-
- /**
- * Wrapper for docsum requests
- **/
- struct DocsumHandler : public Handler,
- public DocsumClient
- {
- DocsumRequest::Source request;
- FNET_Channel *channel;
-
- DocsumHandler(TransportServer &p, DocsumRequest::Source req, FNET_Channel *ch)
- : Handler(p), request(std::move(req)), channel(ch) {}
- void start() override;
- void getDocsumsDone(DocsumReply::UP reply) override;
- ~DocsumHandler();
- };
-
- /**
- * Wrapper for monitor requests
- **/
- struct MonitorHandler : public Handler,
- public MonitorClient
- {
- MonitorRequest::UP request;
- FNET_Connection *connection;
-
- MonitorHandler(TransportServer &p, MonitorRequest::UP req, FNET_Connection *conn)
- : Handler(p), request(std::move(req)), connection(conn) {}
- void start() override;
- void pingDone(MonitorReply::UP reply) override;
- ~MonitorHandler();
- };
-
- // handle incoming network packets
- HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) override;
-
- // set up admin channel for new clients
- bool InitAdminChannel(FNET_Channel *channel) override;
-
- // set up channel for individual request
- bool InitChannel(FNET_Channel *channel, uint32_t pcode) override;
-
- // entry point for thread running transport thread
- void Run(FastOS_ThreadInterface *thisThread, void *arg) override;
-
- // update listen status
- bool updateListen();
-
- // dispatch incoming requests
- void dispatchRequests();
-
- // discard any pending requests during shutdown
- void discardRequests();
-
- // convenience method used to log packets
- static void logPacket(vespalib::stringref msg, FNET_Packet *p, FNET_Channel *ch, FNET_Connection *conn);
-
- void updateQueryMetrics(double latency_s);
- void updateDocsumMetrics(double latency_s, uint32_t numDocs);
-
-public:
- /**
- * Convenience typedes.
- */
- typedef std::unique_ptr<TransportServer> UP;
- typedef std::shared_ptr<TransportServer> SP;
-
- /** no debug logging flags set **/
- static constexpr uint32_t DEBUG_NONE = 0x00000000;
-
- /** log connect disconnect from clients **/
- static constexpr uint32_t DEBUG_CONNECTION = 0x00000001;
-
- /** log channel open events **/
- static constexpr uint32_t DEBUG_CHANNEL = 0x00000002;
-
- /** log search related packets **/
- static constexpr uint32_t DEBUG_SEARCH = 0x00000004;
-
- /** log docsum related packets **/
- static constexpr uint32_t DEBUG_DOCSUM = 0x00000008;
-
- /** log monitor related packets **/
- static constexpr uint32_t DEBUG_MONITOR = 0x00000010;
-
- /** log unhandled packets **/
- static constexpr uint32_t DEBUG_UNHANDLED = 0x00000020;
-
- /** all debug logging flags set **/
- static constexpr uint32_t DEBUG_ALL = 0x0000003f;
-
- /**
- * Check if we should log a debug message
- *
- * @return true if we should log a message for this event
- * @param msgType the event we might want to log
- **/
- bool shouldLog(uint32_t msgType);
-
- /**
- * Create a transport server based on the given underlying api
- * objects. An appropriate debug mask can be made by or'ing
- * together the appropriate DEBUG_ constants defined in this
- * class.
- *
- * @param searchServer search api
- * @param docsumServer docsum api
- * @param monitorServer monitor api
- * @param port listen port.
- * @param debugMask mask indicating what information should be logged as debug messages.
- **/
- TransportServer(SearchServer &searchServer,
- DocsumServer &docsumServer,
- MonitorServer &monitorServer,
- int port, uint32_t debugMask = DEBUG_NONE);
-
- /**
- * Obtain the metrics used by this transport server.
- *
- * @return internal metrics
- **/
- TransportMetrics &getMetrics() { return _metrics; }
-
- /**
- * Obtain the listen spec used by this transport server
- *
- * @return listen spec
- **/
- const vespalib::string &getListenSpec() const { return _listenSpec; }
-
- /**
- * Start this server.
- *
- * @return success(true)/failure(false)
- **/
- bool start();
-
- /**
- * Check for initial readyness.
- *
- * @return true if we are ready.
- **/
- bool isReady() const { return _ready; }
-
- /**
- * Check if a critical error has occurred.
- *
- * @return true if something bad has happened.
- **/
- bool isFailed() const { return _failed; }
-
- /**
- * Get a reference to the internal fnet scheduler.
- *
- * @return fnet scheduler
- **/
- FNET_Scheduler &getScheduler() { return *(_transport.GetScheduler()); }
-
- /**
- * Set a flag indicating whether we should accept incoming
- * requests or not. Setting the flag to false will make this
- * server unavailable to any client application.
- *
- * @param listen flag indicating if we should listen
- **/
- void setListen(bool listen) {
- _doListen = listen;
- _listenTask.ScheduleNow();
- }
-
- /**
- * Check which port this server is currently listening to. This
- * method is useful when using automatically allocated port
- * numbers (listening to port 0).
- *
- * @return current listening port number, -1 if not listening.
- **/
- int getListenPort();
-
- /**
- * Enable or disable nagles algorithm.
- *
- * @param noDelay set to true to disable nagles algorithm
- **/
- void setTCPNoDelay(bool noDelay) { _transport.SetTCPNoDelay(noDelay); }
-
- /**
- * Set a limit on how long a connection may be idle before closing it.
- *
- * @param millisecs max idle time in milliseconds
- **/
- void setIdleTimeout(double millisecs) { _transport.SetIOCTimeOut((uint32_t) millisecs); }
-
- /**
- * Shut down this component. This method will block until the
- * transport server has been shut down. After this method returns,
- * no new requests will be generated by this component.
- **/
- void shutDown() {
- _transport.ShutDown(false);
- _threadPool.Close();
- }
-
- /**
- * Destructor will perform shutdown if needed.
- **/
- ~TransportServer();
-};
-
-}
-