summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorArne H Juul <arnej27959@users.noreply.github.com>2016-06-17 22:34:46 +0200
committerGitHub <noreply@github.com>2016-06-17 22:34:46 +0200
commit4492247bff82d0a4b71cd6e40daa8561f21895b4 (patch)
tree9dd1053ab011b321fe2e2e88e659af023727ad15 /searchcore
parent3bba0952465891fb3efb941e1b9f0d3b39eb7265 (diff)
parent92e27feb0edb017826ebe8d33dbf882b1c9f6f23 (diff)
Merge pull request #47 from yahoo/balder/make-tld-restart-on-port-changes
Balder/make tld restart on port changes
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp162
-rw-r--r--searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h46
2 files changed, 107 insertions, 101 deletions
diff --git a/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp b/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp
index 193c4365189..6e06c91a902 100644
--- a/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp
+++ b/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.cpp
@@ -31,18 +31,12 @@ using document::CompressionConfig;
char FastS_VersionTag[] = V_TAG;
-int FastS_verbose = 0;
-/** @todo Use a config file for these variables */
-int FastS_nsearches;
-double FastS_searchtime;
-
-
namespace fdispatch
{
FastS_FNETAdapter::FastS_FNETAdapter(FastS_AppContext *appCtx)
: _appCtx(appCtx),
- _nodeManager(NULL),
+ _nodeManager(),
_timeKeeper(NULL),
_transport(NULL),
_last_now(0.0),
@@ -92,57 +86,57 @@ FastS_FNETAdapter::fini()
Fdispatch::~Fdispatch(void)
{
- if (_transportServer != NULL) {
+ if (_transportServer) {
_transportServer->shutDown(); // sync shutdown
}
_FNET_adapter.fini();
- if (_nodeManager != NULL)
+ if (_nodeManager) {
_nodeManager->ShutdownConfig();
- if (_transport != NULL && _transportStarted)
+ }
+ if (_transport && _transportStarted) {
_transport->ShutDown(true); // sync shutdown
- if (_rpc != NULL)
+ }
+ if (_rpc) {
_rpc->ShutDown(); // sync shutdown
+ }
LOG(debug, "Will close threadpool");
_mypool->Close();
LOG(debug, "Has closed threadpool");
- delete _transportServer;
- delete _engineAdapter;
- delete _nodeManager;
- if (_transport != NULL) {
- delete _transport;
- }
- delete _rpc;
- delete _mypool;
+ _transportServer.reset();
+ _engineAdapter.reset();
+ _nodeManager.reset();
+ _transport.reset();
+ _rpc.reset();
+ _mypool.reset();
}
FNET_Transport *
Fdispatch::GetFNETTransport()
{
- return _transport;
+ return _transport.get();
}
FNET_Scheduler *
Fdispatch::GetFNETScheduler()
{
- return (_transport == NULL) ?
- NULL : _transport->GetScheduler();
+ return ( ! _transport) ? NULL : _transport->GetScheduler();
}
FastS_NodeManager *
Fdispatch::GetNodeManager()
{
- return _nodeManager;
+ return _nodeManager.get();
}
FastS_DataSetCollection *
Fdispatch::GetDataSetCollection()
{
- if (_nodeManager == NULL)
+ if ( ! _nodeManager)
return NULL;
return _nodeManager->GetDataSetCollection();
}
@@ -151,14 +145,14 @@ Fdispatch::GetDataSetCollection()
FastOS_ThreadPool *
Fdispatch::GetThreadPool()
{
- return _mypool;
+ return _mypool.get();
}
bool
Fdispatch::Failed(void)
{
- return ( (_transportServer != NULL && _transportServer->isFailed()));
+ return ( (_transportServer && _transportServer->isFailed())) || _needRestart;
}
@@ -204,7 +198,7 @@ Fdispatch::CheckTempFail(void)
if (failflag == _tempFail)
return ret;
- if (_transportServer != NULL) {
+ if (_transportServer) {
if (failflag) {
_transportServer->setListen(false);
LOG(error, "Disabling fnet server interface");
@@ -223,16 +217,18 @@ Fdispatch::CheckTempFail(void)
* Set up stuff as specified in the fdispatch-rc-file.
*/
Fdispatch::Fdispatch(const config::ConfigUri &configUri)
- : _mypool(NULL),
- _engineAdapter(NULL),
- _transportServer(NULL),
+ : _mypool(),
+ _engineAdapter(),
+ _transportServer(),
_componentConfig(),
- _nodeManager(NULL),
- _transport(NULL),
+ _nodeManager(),
+ _transport(),
_FNET_adapter(this),
- _rpc(NULL),
+ _rpc(),
_config(),
_configUri(configUri),
+ _fdispatchrcFetcher(configUri.getContext()),
+ _rndGen(),
_partition(0),
_tempFail(false),
_FNETLiveCounterDanger(false),
@@ -243,32 +239,51 @@ Fdispatch::Fdispatch(const config::ConfigUri &configUri)
_FNETLiveCounterDangerStart(),
_timeouts(0u),
_checkLimit(0u),
- _healthPort(0)
+ _healthPort(0),
+ _needRestart(false)
{
int64_t cfgGen = -1;
_config = config::ConfigGetter<FdispatchrcConfig>::
- getConfig(cfgGen,
- _configUri.getConfigId(),
- _configUri.getContext());
+ getConfig(cfgGen, _configUri.getConfigId(), _configUri.getContext());
LOG(config, "fdispatch version %s (RPC-port: %d, transport at %d)",
FastS_VersionTag, _config->frtport, _config->ptport);
_componentConfig.addConfig(vespalib::ComponentConfigProducer::Config("fdispatch", cfgGen,
"config only obtained at startup"));
+ _fdispatchrcFetcher.subscribe<FdispatchrcConfig>(configUri.getConfigId(), this);
+ _fdispatchrcFetcher.start();
}
+namespace {
-void
-Fdispatch::CheckCacheMaxEntries(unsigned int queryCacheMaxEntries,
- unsigned int queryAttrCacheMaxEntries)
+bool needRestart(const FdispatchrcConfig & curr, const FdispatchrcConfig & next)
{
- if (queryAttrCacheMaxEntries == 0)
- return;
+ if (curr.frtport != next.frtport) {
+ LOG(error, "FRT port has changed from %d to %d.", curr.frtport, next.frtport);
+ return true;
+ }
+ if (curr.ptport != next.ptport) {
+ LOG(error, "PT port has changed from %d to %d.", curr.ptport, next.ptport);
+ return true;
+ }
+ if (curr.healthport != next.healthport) {
+ LOG(error, "Health port has changed from %d to %d.", curr.healthport, next.healthport);
+ return true;
+ }
+ return false;
+}
+
+}
- if ((queryAttrCacheMaxEntries <= queryCacheMaxEntries) ||
- (queryCacheMaxEntries == 0)) {
- FastS_abort("Please edit fdispatchrc such that "
- "queryattrcachequeries > querycachequeries.");
+void Fdispatch::configure(std::unique_ptr<FdispatchrcConfig> cfg)
+{
+ if (cfg && _config) {
+ if ( needRestart(*_config, *cfg) ) {
+ const int sleepMS = (1.0 + 30 * _rndGen.nextDouble()) * 1000;
+ LOG(error, "Will restart by abort in %d ms.", sleepMS);
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleepMS));
+ _needRestart.store(true);
+ }
}
}
@@ -298,16 +313,13 @@ Fdispatch::Init(void)
_timeouts = 0;
_checkLimit = 60;
- FS4PersistentPacketStreamer::Instance.SetCompressionLimit(
- _config->packetcompresslimit);
- FS4PersistentPacketStreamer::Instance.SetCompressionLevel(
- _config->packetcompresslevel);
- FS4PersistentPacketStreamer::Instance.SetCompressionType(
- convert(_config->packetcompresstype));
+ FS4PersistentPacketStreamer::Instance.SetCompressionLimit(_config->packetcompresslimit);
+ FS4PersistentPacketStreamer::Instance.SetCompressionLevel(_config->packetcompresslevel);
+ FS4PersistentPacketStreamer::Instance.SetCompressionType(convert(_config->packetcompresstype));
LOG(debug, "Creating FNET transport");
- _transport = new FNET_Transport(_config->transportthreads);
+ _transport = std::make_unique<FNET_Transport>(_config->transportthreads);
// grab node slowness limit defaults
@@ -317,22 +329,20 @@ Fdispatch::Init(void)
FastS_DataSetDesc::SetDefaultSlowDocsumLimitBias(_config->defaultslowdocsumlimitbias);
maxthreads = _config->maxthreads;
- _mypool = new FastOS_ThreadPool(256 * 1024, maxthreads);
+ _mypool = std::make_unique<FastOS_ThreadPool>(256 * 1024, maxthreads);
// Max interval betw read from socket.
FastS_TimeOut::_val[FastS_TimeOut::maxSockSilent] =
_config->maxsocksilent;
- if (_transport != NULL)
+ if (_transport) {
_transport->SetIOCTimeOut((uint32_t)
(FastS_TimeOut::_val[FastS_TimeOut::maxSockSilent] * 1000.0));
+ }
char timestr[40];
- FastS_TimeOut::WriteTime(timestr, sizeof(timestr),
- FastS_TimeOut::_val[FastS_TimeOut::maxSockSilent]);
- LOG(debug,
- "VERBOSE: Max time between successful read from a socket: %s",
- timestr);
+ FastS_TimeOut::WriteTime(timestr, sizeof(timestr), FastS_TimeOut::_val[FastS_TimeOut::maxSockSilent]);
+ LOG(debug, "VERBOSE: Max time between successful read from a socket: %s", timestr);
FastS_QueryCacheUtil::_systemMaxHits = std::numeric_limits<int>::max();
LOG(debug, "VERBOSE: maxhits: %d", FastS_QueryCacheUtil::_systemMaxHits);
@@ -341,9 +351,7 @@ Fdispatch::Init(void)
const uint32_t linesize = 1;
if (FastS_QueryCacheUtil::_systemMaxHits < linesize
&& FastS_QueryCacheUtil::_maxOffset < linesize - FastS_QueryCacheUtil::_systemMaxHits) {
- LOG(warning,
- "maxoffset must be >= %d! (overriding config value)",
- linesize - FastS_QueryCacheUtil::_systemMaxHits);
+ LOG(warning, "maxoffset must be >= %d! (overriding config value)", linesize - FastS_QueryCacheUtil::_systemMaxHits);
FastS_QueryCacheUtil::_maxOffset = linesize - FastS_QueryCacheUtil::_systemMaxHits;
}
LOG(debug, "VERBOSE: maxoffset: %d", FastS_QueryCacheUtil::_maxOffset);
@@ -354,25 +362,21 @@ Fdispatch::Init(void)
LOG(debug, "Using port number %d", ptportnum);
- _nodeManager = new FastS_NodeManager(_componentConfig, this, _partition);
-
+ _nodeManager = std::make_unique<FastS_NodeManager>(_componentConfig, this, _partition);
GetFNETTransport()->SetTCPNoDelay(_config->transportnodelay);
GetFNETTransport()->SetDirectWrite(_config->transportdirectwrite);
assert (ptportnum != 0);
- _engineAdapter = new fdispatch::EngineAdapter(this, _mypool);
- _transportServer = new search::engine::TransportServer
- (*_engineAdapter, *_engineAdapter, *_engineAdapter, ptportnum, search::engine::TransportServer::DEBUG_ALL);
+ _engineAdapter = std::make_unique<fdispatch::EngineAdapter>(this, _mypool.get());
+ _transportServer = std::make_unique<TransportServer>(*_engineAdapter, *_engineAdapter, *_engineAdapter, ptportnum, search::engine::TransportServer::DEBUG_ALL);
_transportServer->setTCPNoDelay(_config->transportnodelay);
_transportServer->setDirectWrite(_config->transportdirectwrite);
if (!_transportServer->start()) {
- delete _transportServer;
- _transportServer = NULL;
- delete _engineAdapter;
- _engineAdapter = NULL;
+ _transportServer.reset();
+ _engineAdapter.reset();
LOG(error, "CRITICAL: Failed to init upwards FNET transport on port %d", ptportnum);
return false;
}
@@ -380,21 +384,19 @@ Fdispatch::Init(void)
_nodeManager->SubscribePartMap(_configUri);
if (_config->frtport != 0) {
- _rpc = new FastS_fdispatch_RPC(this);
- FastS_assert(_rpc != NULL);
+ _rpc = std::make_unique<FastS_fdispatch_RPC>(this);
if (!_rpc->Init(_config->frtport, _configUri.getConfigId())) {
LOG(error, "RPC init failed");
- delete _rpc;
- _rpc = NULL;
+ _rpc.reset();
}
} else {
- _rpc = NULL;
+ _rpc.reset();
}
// Kick off fdispatch administrative threads.
- if (_transport != NULL) {
+ if (_transport) {
_FNET_adapter.init();
- bool rc = _transport->Start(_mypool);
+ bool rc = _transport->Start(_mypool.get());
if (rc) {
LOG(debug, "Started FNET transport");
_transportStarted = true;
@@ -403,7 +405,7 @@ Fdispatch::Init(void)
}
}
FastOS_Thread::Sleep(1000);
- if (_rpc != NULL) {
+ if (_rpc) {
_rpc->Start();
}
_healthPort = _config->healthport;
diff --git a/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h b/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h
index b4ff8d9f37e..f4fa80a77ab 100644
--- a/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h
+++ b/searchcore/src/vespa/searchcore/fdispatch/program/fdispatch.h
@@ -10,6 +10,7 @@
#include <vespa/searchcore/config/config-fdispatchrc.h>
#include <vespa/config/subscription/configuri.h>
#include <vespa/vespalib/net/simple_component_config_producer.h>
+#include <vespa/vespalib/util/random.h>
class FastS_NodeManager;
class FastS_fdispatch_RPC;
@@ -53,33 +54,40 @@ public:
/**
* Note: There is only one instance of this.
*/
-class Fdispatch : public FastS_AppContext
+class Fdispatch : public FastS_AppContext,
+ public config::IFetcherCallback<vespa::config::search::core::FdispatchrcConfig>
{
private:
+ typedef search::engine::TransportServer TransportServer;
+ typedef vespa::config::search::core::FdispatchrcConfig FdispatchrcConfig;
Fdispatch(const Fdispatch &);
Fdispatch& operator=(const Fdispatch &);
- FastOS_ThreadPool *_mypool;
- EngineAdapter *_engineAdapter;
- search::engine::TransportServer *_transportServer;
+ std::unique_ptr<FastOS_ThreadPool> _mypool;
+ std::unique_ptr<EngineAdapter> _engineAdapter;
+ std::unique_ptr<TransportServer> _transportServer;
vespalib::SimpleComponentConfigProducer _componentConfig;
- FastS_NodeManager *_nodeManager;
- FNET_Transport *_transport;
- FastS_FNETAdapter _FNET_adapter;
- FastS_fdispatch_RPC *_rpc;
- std::unique_ptr<vespa::config::search::core::FdispatchrcConfig> _config;
- config::ConfigUri _configUri;
+ std::unique_ptr<FastS_NodeManager> _nodeManager;
+ std::unique_ptr<FNET_Transport> _transport;
+ FastS_FNETAdapter _FNET_adapter;
+ std::unique_ptr<FastS_fdispatch_RPC> _rpc;
+ std::unique_ptr<FdispatchrcConfig> _config;
+ config::ConfigUri _configUri;
+ config::ConfigFetcher _fdispatchrcFetcher;
+ vespalib::RandomGen _rndGen;
unsigned int _partition;
- bool _tempFail;
- bool _FNETLiveCounterDanger;
- bool _FNETLiveCounterWarned;
- bool _FNETLiveCounterFailed;
- bool _transportStarted;
+ bool _tempFail;
+ bool _FNETLiveCounterDanger;
+ bool _FNETLiveCounterWarned;
+ bool _FNETLiveCounterFailed;
+ bool _transportStarted;
unsigned int _lastFNETLiveCounter;
- FastOS_Time _FNETLiveCounterDangerStart;
+ FastOS_Time _FNETLiveCounterDangerStart;
unsigned int _timeouts;
unsigned int _checkLimit;
- int _healthPort;
+ int _healthPort;
+ std::atomic<bool> _needRestart;
+ void configure(std::unique_ptr<FdispatchrcConfig> cfg);
public:
// Implements FastS_AppContext
virtual FNET_Transport *GetFNETTransport();
@@ -95,10 +103,6 @@ public:
int getHealthPort() const { return _healthPort; }
vespalib::SimpleComponentConfigProducer &getComponentConfig() { return _componentConfig; }
- void
- CheckCacheMaxEntries(unsigned int queryCacheMaxEntries,
- unsigned int queryAttrCacheMaxEntries);
-
Fdispatch(const config::ConfigUri &configUri);
~Fdispatch(void);
};