diff options
4 files changed, 82 insertions, 60 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp b/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp index 2f26c1a2601..bbf803adfe4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp @@ -83,13 +83,7 @@ MemoryFlush::Config::Config(uint64_t maxGlobalMemory_in, MemoryFlush::MemoryFlush(const Config &config, fastos::TimeStamp startTime) : _lock(), - _globalMaxMemory(config.maxGlobalMemory), - _maxGlobalTlsSize(config.maxGlobalTlsSize), - _globalDiskBloatFactor(config.globalDiskBloatFactor), - _maxMemoryGain(config.maxMemoryGain), - _diskBloatFactor(config.diskBloatFactor), - _maxSerialGain(config.maxSerialGain), - _maxTimeGain(config.maxTimeGain), + _config(config), _startTime(startTime) { } @@ -101,6 +95,18 @@ MemoryFlush::MemoryFlush() // empty } +MemoryFlush::Config MemoryFlush::getConfig() const +{ + vespalib::LockGuard guard(_lock); + return _config; +} + +void MemoryFlush::setConfig(const Config &config) +{ + vespalib::LockGuard guard(_lock); + _config = config; +} + FlushContext::List MemoryFlush::getFlushTargets(const FlushContext::List &targetList, const flushengine::TlsStatsMap & @@ -110,13 +116,16 @@ MemoryFlush::getFlushTargets(const FlushContext::List &targetList, uint64_t totalMemory(0); IFlushTarget::DiskGain totalDisk; uint64_t totalTlsSize(0); + const Config config(getConfig()); vespalib::hash_set<const void *> visitedHandlers; fastos::TimeStamp now(fastos::ClockSystem::now()); LOG(debug, "getFlushTargets(): globalMaxMemory(%" PRIu64 "), globalDiskBloatFactor(%f), " "maxMemoryGain(%" PRIu64 "), diskBloatFactor(%f), maxSerialGain(%" PRIu64 "), maxTimeGain(%f), startTime(%f)", - _globalMaxMemory, _globalDiskBloatFactor, _maxMemoryGain, _diskBloatFactor, - _maxSerialGain, _maxTimeGain.sec(), _startTime.sec()); + config.maxGlobalMemory, config.globalDiskBloatFactor, + config.maxMemoryGain, config.diskBloatFactor, + config.maxSerialGain, config.maxTimeGain.sec(), + _startTime.sec()); for (size_t i(0), m(targetList.size()); i < m; i++) { const IFlushTarget & target(*targetList[i]->getTarget()); const IFlushHandler & handler(*targetList[i]->getHandler()); @@ -133,26 +142,26 @@ MemoryFlush::getFlushTargets(const FlushContext::List &targetList, tlsStatsMap.getTlsStats(handler.getName()); if (visitedHandlers.insert(&handler).second) { totalTlsSize += tlsStats.getNumBytes(); - if ((totalTlsSize > _maxGlobalTlsSize) && (order < TLSSIZE)) { + if ((totalTlsSize > config.maxGlobalTlsSize) && (order < TLSSIZE)) { order = TLSSIZE; } } - if (((totalMemory >= _globalMaxMemory) || - (mgain >= _maxMemoryGain)) && (order < MEMORY)) { + if (((totalMemory >= config.maxGlobalMemory) || + (mgain >= config.maxMemoryGain)) && (order < MEMORY)) { order = MEMORY; } else if (((totalDisk.gain() > - _globalDiskBloatFactor * std::max(100000000l, + config.globalDiskBloatFactor * std::max(100000000l, std::max(totalDisk.getBefore(), totalDisk.getAfter()))) - || dgain.gain() > _diskBloatFactor * + || dgain.gain() > config.diskBloatFactor * std::max(10000000l, std::max(dgain.getBefore(), dgain.getAfter()))) && (order < DISKBLOAT) ) { order = DISKBLOAT; - } else if ((serialDiff >= _maxSerialGain) && (order < MAXSERIAL)) { + } else if ((serialDiff >= config.maxSerialGain) && (order < MAXSERIAL)) { order = MAXSERIAL; - } else if ((timeDiff >= _maxTimeGain) && (order < MAXAGE)) { + } else if ((timeDiff >= config.maxTimeGain) && (order < MAXAGE)) { order = MAXAGE; } LOG(debug, diff --git a/searchcore/src/vespa/searchcore/proton/server/memoryflush.h b/searchcore/src/vespa/searchcore/proton/server/memoryflush.h index cb0c84b6364..165ba4de70e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/memoryflush.h +++ b/searchcore/src/vespa/searchcore/proton/server/memoryflush.h @@ -15,12 +15,21 @@ class MemoryFlush : public boost::noncopyable, public: struct Config { + /// Global maxMemory uint64_t maxGlobalMemory; + /// Maximum global tls size. uint64_t maxGlobalTlsSize; + /// Maximum global disk bloat factor. When this limit is reached + /// flush is forced. double globalDiskBloatFactor; + /// Maximum memory saved. When this limit is reached flush is forced. int64_t maxMemoryGain; + /// Maximum disk bloat factor. When this limit is reached + /// flush is forced. double diskBloatFactor; + /// Maximum count of what a target can have outstanding in the TLS. int64_t maxSerialGain; + /// Maximum age of unflushed data. fastos::TimeStamp maxTimeGain; Config(); Config(uint64_t maxGlobalMemory_in, @@ -35,21 +44,7 @@ public: private: /// Needed as flushDone is called in different context from the rest vespalib::Lock _lock; - /// Global maxMemory - uint64_t _globalMaxMemory; - /// Maximum global tls size. - uint64_t _maxGlobalTlsSize; - /// Maximum global disk bloat factor. When this limit is reached - /// flush is forced. - double _globalDiskBloatFactor; - /// Maximum memory saved. When this limit is reached flush is forced. - int64_t _maxMemoryGain; - /// Maximum disk bloat factor. When this limit is reached flush is forced. - double _diskBloatFactor; - /// Maximum count of what a target can have outstanding in the TLS. - int64_t _maxSerialGain; - /// Maximum age of unflushed data. - fastos::TimeStamp _maxTimeGain; + Config _config; /// The time when the strategy was started. fastos::TimeStamp _startTime; @@ -73,6 +68,8 @@ private: const flushengine::TlsStatsMap &_tlsStatsMap; }; + Config getConfig() const; + public: MemoryFlush(); @@ -84,6 +81,8 @@ public: getFlushTargets(const FlushContext::List &targetList, const flushengine::TlsStatsMap & tlsStatsMap) const override; + + void setConfig(const Config &config); }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 60c54bf1ef3..ef66302b181 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -89,10 +89,41 @@ diskMemUsageSamplerConfig(const ProtonConfig &proton) proton.writefilter.sampleinterval); } +static constexpr size_t TOTAL_HARD_MEMORY_LIMIT=16*1024*1024*1024ul; +static constexpr size_t EACH_HARD_MEMORY_LIMIT=12*1024*1024*1024ul; + +MemoryFlush::Config +memoryFlushConfig(const ProtonConfig::Flush &flush) +{ + size_t totalMaxMemory = flush.memory.maxmemory; + if (totalMaxMemory > TOTAL_HARD_MEMORY_LIMIT) { + LOG(warning, "flush.memory.maxmemory=%ld can not" + " be set above the hard limit of %ld so we cap it", + flush.memory.maxmemory, + TOTAL_HARD_MEMORY_LIMIT); + totalMaxMemory = TOTAL_HARD_MEMORY_LIMIT; + } + size_t eachMaxMemory = flush.memory.each.maxmemory; + if (eachMaxMemory > EACH_HARD_MEMORY_LIMIT) { + LOG(warning, "flush.memory.each.maxmemory=%ld can not" + " be set above the hard limit of %ld so we cap it", + flush.memory.maxmemory, + EACH_HARD_MEMORY_LIMIT); + eachMaxMemory = EACH_HARD_MEMORY_LIMIT; + } + return MemoryFlush::Config(totalMaxMemory, + flush.memory.maxtlssize, + flush.memory.diskbloatfactor, + eachMaxMemory, + flush.memory.each.diskbloatfactor, + flush.memory.maxage.serial, + static_cast<long> + (flush.memory.maxage.time) * + fastos::TimeStamp::NANO); +} + } -static const size_t TOTAL_HARD_MEMORY_LIMIT=16*1024*1024*1024ul; -static const size_t EACH_HARD_MEMORY_LIMIT=12*1024*1024*1024ul; static const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component"; Proton::ProtonFileHeaderContext::ProtonFileHeaderContext(const Proton &proton_, @@ -262,34 +293,9 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) IFlushStrategy::SP strategy; const ProtonConfig::Flush & flush(protonConfig.flush); switch (flush.strategy) { - case ProtonConfig::Flush::MEMORY: { - size_t totalMaxMemory = flush.memory.maxmemory; - if (totalMaxMemory > TOTAL_HARD_MEMORY_LIMIT) { - LOG(warning, "flush.memory.maxmemory=%ld can not" - " be set above the hard limit of %ld so we cap it", - flush.memory.maxmemory, - TOTAL_HARD_MEMORY_LIMIT); - totalMaxMemory = TOTAL_HARD_MEMORY_LIMIT; - } - size_t eachMaxMemory = flush.memory.each.maxmemory; - if (eachMaxMemory > EACH_HARD_MEMORY_LIMIT) { - LOG(warning, "flush.memory.each.maxmemory=%ld can not" - " be set above the hard limit of %ld so we cap it", - flush.memory.maxmemory, - EACH_HARD_MEMORY_LIMIT); - eachMaxMemory = EACH_HARD_MEMORY_LIMIT; - } + case ProtonConfig::Flush::MEMORY: strategy = std::make_shared<MemoryFlush>( - MemoryFlush::Config(totalMaxMemory, - flush.memory.maxtlssize, - flush.memory.diskbloatfactor, - eachMaxMemory, - flush.memory.each.diskbloatfactor, - flush.memory.maxage.serial, - static_cast<long> - (flush.memory.maxage.time) * - fastos::TimeStamp::NANO)); - } + memoryFlushConfig(flush)); break; case ProtonConfig::Flush::SIMPLE: default: @@ -299,6 +305,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot) vespalib::mkdir(protonConfig.basedir + "/documents", true); vespalib::chdir(protonConfig.basedir); _tls->start(); + _strategy = strategy; _flushEngine.reset(new FlushEngine(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()), strategy, flush.maxconcurrent, flush.idleinterval*1000, true)); _fs4Server.reset(new TransportServer(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL)); @@ -538,6 +545,12 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot, configSnapshot->getGeneration())); _diskMemUsageSampler-> setConfig(diskMemUsageSamplerConfig(protonConfig)); + std::shared_ptr<MemoryFlush> memoryFlushStrategy = + std::dynamic_pointer_cast<MemoryFlush>(_strategy); + if (memoryFlushStrategy) { + memoryFlushStrategy->setConfig(memoryFlushConfig(protonConfig.flush)); + _flushEngine->kick(); + } } void diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 023a6173984..be8e602fa2a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -106,6 +106,7 @@ private: MatchEngine::UP _matchEngine; SummaryEngine::UP _summaryEngine; DocsumBySlime::UP _docsumBySlime; + IFlushStrategy::SP _strategy; FlushEngine::UP _flushEngine; RPCHooks::UP _rpcHooks; HealthAdapter _healthAdapter; |