diff options
author | Arne H Juul <arnej27959@users.noreply.github.com> | 2016-06-17 22:34:46 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-17 22:34:46 +0200 |
commit | 4492247bff82d0a4b71cd6e40daa8561f21895b4 (patch) | |
tree | 9dd1053ab011b321fe2e2e88e659af023727ad15 /searchcore | |
parent | 3bba0952465891fb3efb941e1b9f0d3b39eb7265 (diff) | |
parent | 92e27feb0edb017826ebe8d33dbf882b1c9f6f23 (diff) |
Merge pull request #47 from yahoo/balder/make-tld-restart-on-port-changes
Balder/make tld restart on port changes
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp | 162 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h | 46 |
2 files changed, 107 insertions, 101 deletions
diff --git a/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp b/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp index 193c4365189..6e06c91a902 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp @@ -31,18 +31,12 @@ using document::CompressionConfig; char FastS_VersionTag[] = V_TAG; -int FastS_verbose = 0; -/** @todo Use a config file for these variables */ -int FastS_nsearches; -double FastS_searchtime; - - namespace fdispatch { FastS_FNETAdapter::FastS_FNETAdapter(FastS_AppContext *appCtx) : _appCtx(appCtx), - _nodeManager(NULL), + _nodeManager(), _timeKeeper(NULL), _transport(NULL), _last_now(0.0), @@ -92,57 +86,57 @@ FastS_FNETAdapter::fini() Fdispatch::~Fdispatch(void) { - if (_transportServer != NULL) { + if (_transportServer) { _transportServer->shutDown(); // sync shutdown } _FNET_adapter.fini(); - if (_nodeManager != NULL) + if (_nodeManager) { _nodeManager->ShutdownConfig(); - if (_transport != NULL && _transportStarted) + } + if (_transport && _transportStarted) { _transport->ShutDown(true); // sync shutdown - if (_rpc != NULL) + } + if (_rpc) { _rpc->ShutDown(); // sync shutdown + } LOG(debug, "Will close threadpool"); _mypool->Close(); LOG(debug, "Has closed threadpool"); - delete _transportServer; - delete _engineAdapter; - delete _nodeManager; - if (_transport != NULL) { - delete _transport; - } - delete _rpc; - delete _mypool; + _transportServer.reset(); + _engineAdapter.reset(); + _nodeManager.reset(); + _transport.reset(); + _rpc.reset(); + _mypool.reset(); } FNET_Transport * Fdispatch::GetFNETTransport() { - return _transport; + return _transport.get(); } FNET_Scheduler * Fdispatch::GetFNETScheduler() { - return (_transport == NULL) ? - NULL : _transport->GetScheduler(); + return ( ! _transport) ? NULL : _transport->GetScheduler(); } FastS_NodeManager * Fdispatch::GetNodeManager() { - return _nodeManager; + return _nodeManager.get(); } FastS_DataSetCollection * Fdispatch::GetDataSetCollection() { - if (_nodeManager == NULL) + if ( ! _nodeManager) return NULL; return _nodeManager->GetDataSetCollection(); } @@ -151,14 +145,14 @@ Fdispatch::GetDataSetCollection() FastOS_ThreadPool * Fdispatch::GetThreadPool() { - return _mypool; + return _mypool.get(); } bool Fdispatch::Failed(void) { - return ( (_transportServer != NULL && _transportServer->isFailed())); + return ( (_transportServer && _transportServer->isFailed())) || _needRestart; } @@ -204,7 +198,7 @@ Fdispatch::CheckTempFail(void) if (failflag == _tempFail) return ret; - if (_transportServer != NULL) { + if (_transportServer) { if (failflag) { _transportServer->setListen(false); LOG(error, "Disabling fnet server interface"); @@ -223,16 +217,18 @@ Fdispatch::CheckTempFail(void) * Set up stuff as specified in the fdispatch-rc-file. */ Fdispatch::Fdispatch(const config::ConfigUri &configUri) - : _mypool(NULL), - _engineAdapter(NULL), - _transportServer(NULL), + : _mypool(), + _engineAdapter(), + _transportServer(), _componentConfig(), - _nodeManager(NULL), - _transport(NULL), + _nodeManager(), + _transport(), _FNET_adapter(this), - _rpc(NULL), + _rpc(), _config(), _configUri(configUri), + _fdispatchrcFetcher(configUri.getContext()), + _rndGen(), _partition(0), _tempFail(false), _FNETLiveCounterDanger(false), @@ -243,32 +239,51 @@ Fdispatch::Fdispatch(const config::ConfigUri &configUri) _FNETLiveCounterDangerStart(), _timeouts(0u), _checkLimit(0u), - _healthPort(0) + _healthPort(0), + _needRestart(false) { int64_t cfgGen = -1; _config = config::ConfigGetter<FdispatchrcConfig>:: - getConfig(cfgGen, - _configUri.getConfigId(), - _configUri.getContext()); + getConfig(cfgGen, _configUri.getConfigId(), _configUri.getContext()); LOG(config, "fdispatch version %s (RPC-port: %d, transport at %d)", FastS_VersionTag, _config->frtport, _config->ptport); _componentConfig.addConfig(vespalib::ComponentConfigProducer::Config("fdispatch", cfgGen, "config only obtained at startup")); + _fdispatchrcFetcher.subscribe<FdispatchrcConfig>(configUri.getConfigId(), this); + _fdispatchrcFetcher.start(); } +namespace { -void -Fdispatch::CheckCacheMaxEntries(unsigned int queryCacheMaxEntries, - unsigned int queryAttrCacheMaxEntries) +bool needRestart(const FdispatchrcConfig & curr, const FdispatchrcConfig & next) { - if (queryAttrCacheMaxEntries == 0) - return; + if (curr.frtport != next.frtport) { + LOG(error, "FRT port has changed from %d to %d.", curr.frtport, next.frtport); + return true; + } + if (curr.ptport != next.ptport) { + LOG(error, "PT port has changed from %d to %d.", curr.ptport, next.ptport); + return true; + } + if (curr.healthport != next.healthport) { + LOG(error, "Health port has changed from %d to %d.", curr.healthport, next.healthport); + return true; + } + return false; +} + +} - if ((queryAttrCacheMaxEntries <= queryCacheMaxEntries) || - (queryCacheMaxEntries == 0)) { - FastS_abort("Please edit fdispatchrc such that " - "queryattrcachequeries > querycachequeries."); +void Fdispatch::configure(std::unique_ptr<FdispatchrcConfig> cfg) +{ + if (cfg && _config) { + if ( needRestart(*_config, *cfg) ) { + const int sleepMS = (1.0 + 30 * _rndGen.nextDouble()) * 1000; + LOG(error, "Will restart by abort in %d ms.", sleepMS); + std::this_thread::sleep_for(std::chrono::milliseconds(sleepMS)); + _needRestart.store(true); + } } } @@ -298,16 +313,13 @@ Fdispatch::Init(void) _timeouts = 0; _checkLimit = 60; - FS4PersistentPacketStreamer::Instance.SetCompressionLimit( - _config->packetcompresslimit); - FS4PersistentPacketStreamer::Instance.SetCompressionLevel( - _config->packetcompresslevel); - FS4PersistentPacketStreamer::Instance.SetCompressionType( - convert(_config->packetcompresstype)); + FS4PersistentPacketStreamer::Instance.SetCompressionLimit(_config->packetcompresslimit); + FS4PersistentPacketStreamer::Instance.SetCompressionLevel(_config->packetcompresslevel); + FS4PersistentPacketStreamer::Instance.SetCompressionType(convert(_config->packetcompresstype)); LOG(debug, "Creating FNET transport"); - _transport = new FNET_Transport(_config->transportthreads); + _transport = std::make_unique<FNET_Transport>(_config->transportthreads); // grab node slowness limit defaults @@ -317,22 +329,20 @@ Fdispatch::Init(void) FastS_DataSetDesc::SetDefaultSlowDocsumLimitBias(_config->defaultslowdocsumlimitbias); maxthreads = _config->maxthreads; - _mypool = new FastOS_ThreadPool(256 * 1024, maxthreads); + _mypool = std::make_unique<FastOS_ThreadPool>(256 * 1024, maxthreads); // Max interval betw read from socket. FastS_TimeOut::_val[FastS_TimeOut::maxSockSilent] = _config->maxsocksilent; - if (_transport != NULL) + if (_transport) { _transport->SetIOCTimeOut((uint32_t) (FastS_TimeOut::_val[FastS_TimeOut::maxSockSilent] * 1000.0)); + } char timestr[40]; - FastS_TimeOut::WriteTime(timestr, sizeof(timestr), - FastS_TimeOut::_val[FastS_TimeOut::maxSockSilent]); - LOG(debug, - "VERBOSE: Max time between successful read from a socket: %s", - timestr); + FastS_TimeOut::WriteTime(timestr, sizeof(timestr), FastS_TimeOut::_val[FastS_TimeOut::maxSockSilent]); + LOG(debug, "VERBOSE: Max time between successful read from a socket: %s", timestr); FastS_QueryCacheUtil::_systemMaxHits = std::numeric_limits<int>::max(); LOG(debug, "VERBOSE: maxhits: %d", FastS_QueryCacheUtil::_systemMaxHits); @@ -341,9 +351,7 @@ Fdispatch::Init(void) const uint32_t linesize = 1; if (FastS_QueryCacheUtil::_systemMaxHits < linesize && FastS_QueryCacheUtil::_maxOffset < linesize - FastS_QueryCacheUtil::_systemMaxHits) { - LOG(warning, - "maxoffset must be >= %d! (overriding config value)", - linesize - FastS_QueryCacheUtil::_systemMaxHits); + LOG(warning, "maxoffset must be >= %d! (overriding config value)", linesize - FastS_QueryCacheUtil::_systemMaxHits); FastS_QueryCacheUtil::_maxOffset = linesize - FastS_QueryCacheUtil::_systemMaxHits; } LOG(debug, "VERBOSE: maxoffset: %d", FastS_QueryCacheUtil::_maxOffset); @@ -354,25 +362,21 @@ Fdispatch::Init(void) LOG(debug, "Using port number %d", ptportnum); - _nodeManager = new FastS_NodeManager(_componentConfig, this, _partition); - + _nodeManager = std::make_unique<FastS_NodeManager>(_componentConfig, this, _partition); GetFNETTransport()->SetTCPNoDelay(_config->transportnodelay); GetFNETTransport()->SetDirectWrite(_config->transportdirectwrite); assert (ptportnum != 0); - _engineAdapter = new fdispatch::EngineAdapter(this, _mypool); - _transportServer = new search::engine::TransportServer - (*_engineAdapter, *_engineAdapter, *_engineAdapter, ptportnum, search::engine::TransportServer::DEBUG_ALL); + _engineAdapter = std::make_unique<fdispatch::EngineAdapter>(this, _mypool.get()); + _transportServer = std::make_unique<TransportServer>(*_engineAdapter, *_engineAdapter, *_engineAdapter, ptportnum, search::engine::TransportServer::DEBUG_ALL); _transportServer->setTCPNoDelay(_config->transportnodelay); _transportServer->setDirectWrite(_config->transportdirectwrite); if (!_transportServer->start()) { - delete _transportServer; - _transportServer = NULL; - delete _engineAdapter; - _engineAdapter = NULL; + _transportServer.reset(); + _engineAdapter.reset(); LOG(error, "CRITICAL: Failed to init upwards FNET transport on port %d", ptportnum); return false; } @@ -380,21 +384,19 @@ Fdispatch::Init(void) _nodeManager->SubscribePartMap(_configUri); if (_config->frtport != 0) { - _rpc = new FastS_fdispatch_RPC(this); - FastS_assert(_rpc != NULL); + _rpc = std::make_unique<FastS_fdispatch_RPC>(this); if (!_rpc->Init(_config->frtport, _configUri.getConfigId())) { LOG(error, "RPC init failed"); - delete _rpc; - _rpc = NULL; + _rpc.reset(); } } else { - _rpc = NULL; + _rpc.reset(); } // Kick off fdispatch administrative threads. - if (_transport != NULL) { + if (_transport) { _FNET_adapter.init(); - bool rc = _transport->Start(_mypool); + bool rc = _transport->Start(_mypool.get()); if (rc) { LOG(debug, "Started FNET transport"); _transportStarted = true; @@ -403,7 +405,7 @@ Fdispatch::Init(void) } } FastOS_Thread::Sleep(1000); - if (_rpc != NULL) { + if (_rpc) { _rpc->Start(); } _healthPort = _config->healthport; diff --git a/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h b/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h index b4ff8d9f37e..f4fa80a77ab 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h +++ b/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h @@ -10,6 +10,7 @@ #include <vespa/searchcore/config/config-fdispatchrc.h> #include <vespa/config/subscription/configuri.h> #include <vespa/vespalib/net/simple_component_config_producer.h> +#include <vespa/vespalib/util/random.h> class FastS_NodeManager; class FastS_fdispatch_RPC; @@ -53,33 +54,40 @@ public: /** * Note: There is only one instance of this. */ -class Fdispatch : public FastS_AppContext +class Fdispatch : public FastS_AppContext, + public config::IFetcherCallback<vespa::config::search::core::FdispatchrcConfig> { private: + typedef search::engine::TransportServer TransportServer; + typedef vespa::config::search::core::FdispatchrcConfig FdispatchrcConfig; Fdispatch(const Fdispatch &); Fdispatch& operator=(const Fdispatch &); - FastOS_ThreadPool *_mypool; - EngineAdapter *_engineAdapter; - search::engine::TransportServer *_transportServer; + std::unique_ptr<FastOS_ThreadPool> _mypool; + std::unique_ptr<EngineAdapter> _engineAdapter; + std::unique_ptr<TransportServer> _transportServer; vespalib::SimpleComponentConfigProducer _componentConfig; - FastS_NodeManager *_nodeManager; - FNET_Transport *_transport; - FastS_FNETAdapter _FNET_adapter; - FastS_fdispatch_RPC *_rpc; - std::unique_ptr<vespa::config::search::core::FdispatchrcConfig> _config; - config::ConfigUri _configUri; + std::unique_ptr<FastS_NodeManager> _nodeManager; + std::unique_ptr<FNET_Transport> _transport; + FastS_FNETAdapter _FNET_adapter; + std::unique_ptr<FastS_fdispatch_RPC> _rpc; + std::unique_ptr<FdispatchrcConfig> _config; + config::ConfigUri _configUri; + config::ConfigFetcher _fdispatchrcFetcher; + vespalib::RandomGen _rndGen; unsigned int _partition; - bool _tempFail; - bool _FNETLiveCounterDanger; - bool _FNETLiveCounterWarned; - bool _FNETLiveCounterFailed; - bool _transportStarted; + bool _tempFail; + bool _FNETLiveCounterDanger; + bool _FNETLiveCounterWarned; + bool _FNETLiveCounterFailed; + bool _transportStarted; unsigned int _lastFNETLiveCounter; - FastOS_Time _FNETLiveCounterDangerStart; + FastOS_Time _FNETLiveCounterDangerStart; unsigned int _timeouts; unsigned int _checkLimit; - int _healthPort; + int _healthPort; + std::atomic<bool> _needRestart; + void configure(std::unique_ptr<FdispatchrcConfig> cfg); public: // Implements FastS_AppContext virtual FNET_Transport *GetFNETTransport(); @@ -95,10 +103,6 @@ public: int getHealthPort() const { return _healthPort; } vespalib::SimpleComponentConfigProducer &getComponentConfig() { return _componentConfig; } - void - CheckCacheMaxEntries(unsigned int queryCacheMaxEntries, - unsigned int queryAttrCacheMaxEntries); - Fdispatch(const config::ConfigUri &configUri); ~Fdispatch(void); }; |