summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-10-06 14:05:38 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-10-06 14:58:28 +0200
commitcb60fccb0fb3bf92bf2a223324dad44e986ae54d (patch)
treebf3dc2a59ae2a4b501bbded96ce68fc56f2b2f4e /searchcore
parent3901ca812608d7c2fbcfdd7ca0524a8758a212ac (diff)
Let config reflect what it is, a threadpool serving a range of background task in the backend.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp70
3 files changed, 42 insertions, 40 deletions
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 594a8717d7f..8c01c45dac8 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -256,6 +256,8 @@ summary.log.maxbucketspread double default=2.5
summary.log.minfilesizefactor double default=0.2
## Number of threads used for compressing incomming documents/compacting.
+## Deprecated. Use background.threads instead.
+## TODO Remove
summary.log.numthreads int default=8 restart
## Control io options during flush of stored documents.
@@ -376,6 +378,11 @@ visit.ignoremaxbytes bool default=true
## When set to 0 (default) we use 1 separate thread per document database.
initialize.threads int default = 0
+## Number of worker threads doing background compaction/compression tasks.
+## They all live i a shared thread pool.
+## When set to 0 (default), it will have enough threads to saturate half of the cores.
+background.threads int default=0
+
## Portion of enumstore address space that can be used before put and update
## portion of feed is blocked.
writefilter.attribute.enumstorelimit double default = 0.9
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
index f45fc658129..6aea5234fbe 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp
@@ -170,9 +170,8 @@ deriveConfig(const ProtonConfig::Summary & summary, const ProtonConfig::Flush::M
logConfig.setMaxFileSize(log.maxfilesize)
.setMaxDiskBloatFactor(std::min(flush.diskbloatfactor, flush.each.diskbloatfactor))
.setMaxBucketSpread(log.maxbucketspread).setMinFileSizeFactor(log.minfilesizefactor)
- .setNumThreads(log.numthreads).compact2ActiveFile(log.compact2activefile)
- .compactCompression(deriveCompression(log.compact.compression)).setFileConfig(fileConfig)
- .disableCrcOnRead(chunk.skipcrconread);
+ .compact2ActiveFile(log.compact2activefile).compactCompression(deriveCompression(log.compact.compression))
+ .setFileConfig(fileConfig).disableCrcOnRead(chunk.skipcrconread);
return LogDocumentStore::Config(config, logConfig);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 34cfaaeddb2..fbac7cb4278 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -85,12 +85,24 @@ diskMemUsageSamplerConfig(const ProtonConfig &proton, const HwInfo &hwInfo)
hwInfo);
}
+size_t
+deriveBackgroundThreads(const ProtonConfig & proton) {
+ size_t threads = std::thread::hardware_concurrency();
+ if (proton.background.threads != 0) {
+ threads = proton.background.threads;
+ }
+ threads = std::max(threads, size_t(proton.summary.log.numthreads));
+
+ // We need at least 1 guaranteed free worker in order to ensure progress so #documentsdbs + 1 should suffice,
+ // but we will not be cheap and give #documentsdbs * 2
+ return std::max(threads, proton.documentdb.size() * 2);;
}
-static const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component";
+const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component";
-Proton::ProtonFileHeaderContext::ProtonFileHeaderContext(const Proton &proton_,
- const vespalib::string &creator)
+}
+
+Proton::ProtonFileHeaderContext::ProtonFileHeaderContext(const Proton &proton_, const vespalib::string &creator)
: _proton(proton_),
_hostName(),
_creator(creator),
@@ -208,7 +220,7 @@ Proton::init()
{
assert( ! _initStarted && ! _initComplete );
_initStarted = true;
- if (_threadPool.NewThread(&_clock, NULL) == NULL) {
+ if (_threadPool.NewThread(&_clock, nullptr) == nullptr) {
throw IllegalStateException("Failed starting thread for the cheap clock");
}
_protonConfigFetcher.start();
@@ -234,8 +246,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
hwDiskCfg.samplewritesize,
hwDiskCfg.shared,
hwMemoryCfg.size);
- _hwInfoSampler = std::make_unique<HwInfoSampler>(protonConfig.basedir,
- samplerCfg);
+ _hwInfoSampler = std::make_unique<HwInfoSampler>(protonConfig.basedir, samplerCfg);
_hwInfo = _hwInfoSampler->hwInfo();
setFS4Compression(protonConfig);
_diskMemUsageSampler = std::make_unique<DiskMemUsageSampler>
@@ -244,8 +255,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
_metricsEngine.reset(new MetricsEngine());
_metricsEngine->addMetricsHook(_metricsHook);
- _fileHeaderContext.setClusterName(protonConfig.clustername,
- protonConfig.basedir);
+ _fileHeaderContext.setClusterName(protonConfig.clustername, protonConfig.basedir);
_tls.reset(new TLS(_configUri.createWithNewId(protonConfig.tlsconfigid), _fileHeaderContext));
_matchEngine.reset(new MatchEngine(protonConfig.numsearcherthreads,
protonConfig.numthreadspersearch,
@@ -257,7 +267,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
const ProtonConfig::Flush & flush(protonConfig.flush);
switch (flush.strategy) {
case ProtonConfig::Flush::MEMORY: {
- MemoryFlush::SP memoryFlush = std::make_shared<MemoryFlush>(
+ auto memoryFlush = std::make_shared<MemoryFlush>(
MemoryFlushConfigUpdater::convertConfig(flush.memory), fastos::ClockSystem::now());
_memoryFlushConfigUpdater = std::make_unique<MemoryFlushConfigUpdater>(memoryFlush, flush.memory);
_diskMemUsageSampler->notifier().addDiskMemUsageListener(_memoryFlushConfigUpdater.get());
@@ -290,14 +300,12 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
vespalib::string fileConfigId;
_warmupExecutor.reset(new vespalib::ThreadStackExecutor(4, 128*1024));
- // We need at least 1 guaranteed free worker in order to ensure progress so #documentsdbs + 1 should suffice,
- // but we will not be cheap and give #documentsdbs * 2
- const size_t summaryThreads = std::max(size_t(protonConfig.summary.log.numthreads), protonConfig.documentdb.size() * 2);
+
+ const size_t summaryThreads = deriveBackgroundThreads(protonConfig);
_summaryExecutor.reset(new vespalib::BlockingThreadStackExecutor(summaryThreads, 128*1024, summaryThreads*16));
InitializeThreads initializeThreads;
if (protonConfig.initialize.threads > 0) {
- initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>
- (protonConfig.initialize.threads, 128 * 1024);
+ initializeThreads = std::make_shared<vespalib::ThreadStackExecutor>(protonConfig.initialize.threads, 128 * 1024);
_initDocumentDbsInSequence = (protonConfig.initialize.threads == 1);
}
_protonConfigurer.applyInitialConfig(initializeThreads);
@@ -563,10 +571,8 @@ Proton::addDocumentDB(const document::DocumentType &docType,
try {
ret->start();
} catch (vespalib::Exception &e) {
- LOG(warning,
- "Failed to start database for document type '%s'; %s",
- docTypeName.toString().c_str(),
- e.what());
+ LOG(warning, "Failed to start database for document type '%s'; %s",
+ docTypeName.toString().c_str(), e.what());
return DocumentDB::SP();
}
// Wait for replay done on document dbs added due to reconfigs, since engines are already up and running.
@@ -577,14 +583,12 @@ Proton::addDocumentDB(const document::DocumentType &docType,
_metricsEngine->addDocumentDBMetrics(ret->getMetricsCollection());
_metricsEngine->addMetricsHook(ret->getMetricsUpdateHook());
_documentDBMap[docTypeName] = ret;
- if (_persistenceEngine.get() != NULL) {
+ if (_persistenceEngine) {
// Not allowed to get to service layer to call pause().
std::unique_lock<std::shared_timed_mutex> persistenceWGuard(_persistenceEngine->getWLock());
- PersistenceHandlerProxy::SP
- persistenceHandler(new PersistenceHandlerProxy(ret));
+ auto persistenceHandler = std::make_shared<PersistenceHandlerProxy>(ret);
if (!_isInitializing) {
- _persistenceEngine->
- propagateSavedClusterState(*persistenceHandler);
+ _persistenceEngine->propagateSavedClusterState(*persistenceHandler);
_persistenceEngine->populateInitialBucketDB(bucketSpace, *persistenceHandler);
}
// TODO: Fix race with new cluster state setting.
@@ -649,10 +653,7 @@ Proton::ping(MonitorRequest::UP request, MonitorClient & client)
BootstrapConfig::SP configSnapshot = getActiveConfigSnapshot();
const ProtonConfig &protonConfig = configSnapshot->getProtonConfig();
ret.partid = protonConfig.partition;
- if (_matchEngine->isOnline())
- ret.timestamp = 42; // change to flush caches on tld/qrs
- else
- ret.timestamp = 0;
+ ret.timestamp = (_matchEngine->isOnline()) ? 42 : 0;
ret.activeDocs = getNumActiveDocs();
ret.activeDocsRequested = request->reportActiveDocs;
return reply;
@@ -661,7 +662,7 @@ Proton::ping(MonitorRequest::UP request, MonitorClient & client)
bool
Proton::triggerFlush()
{
- if ((_flushEngine.get() == NULL) || ! _flushEngine->HasThread()) {
+ if (!_flushEngine || ! _flushEngine->HasThread()) {
return false;
}
_flushEngine->triggerFlush();
@@ -682,13 +683,12 @@ createPrepareRestartConfig(const ProtonConfig &protonConfig)
bool
Proton::prepareRestart()
{
- if ((_flushEngine.get() == NULL) || ! _flushEngine->HasThread()) {
+ if (!_flushEngine || ! _flushEngine->HasThread()) {
return false;
}
BootstrapConfig::SP configSnapshot = getActiveConfigSnapshot();
- IFlushStrategy::SP strategy =
- std::make_shared<PrepareRestartFlushStrategy>(
- createPrepareRestartConfig(configSnapshot->getProtonConfig()));
+ auto strategy = std::make_shared<PrepareRestartFlushStrategy>(
+ createPrepareRestartConfig(configSnapshot->getProtonConfig()));
_flushEngine->setStrategy(strategy);
return true;
}
@@ -700,7 +700,7 @@ int countOpenFiles()
static const char * const fd_dir_name = "/proc/self/fd";
int count = 0;
DIR *dp = opendir(fd_dir_name);
- if (dp != NULL) {
+ if (dp != nullptr) {
struct dirent *ptr;
while ((ptr = readdir(dp)) != nullptr) {
if (strcmp(".", ptr->d_name) == 0) continue;
@@ -738,10 +738,6 @@ Proton::updateMetrics(const vespalib::MonitorGuard &)
}
}
-namespace {
-const std::string config_id_tag = "CONFIG ID";
-} // namespace
-
void
Proton::waitForInitDone()
{