aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahoo-inc.com>2016-06-17 11:05:02 +0000
committerTor Egge <Tor.Egge@yahoo-inc.com>2016-06-17 11:05:02 +0000
commitb9db3a327cec650a71be4de51b5e0a4b5923148c (patch)
treea6cc4682010f9a5240b288b491cd35e5530eef0d
parentf1004b3b19fb723292f844ce7b93489bf1a339b4 (diff)
Handle live memory flush strategy config changes:
Use a stable copy of config when calculating target flush order. Forward new config to active memory flush strategy and kick flush engine when config has changed.
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp41
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/memoryflush.h29
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp71
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h1
4 files changed, 82 insertions, 60 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp b/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp
index 2f26c1a2601..bbf803adfe4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/memoryflush.cpp
@@ -83,13 +83,7 @@ MemoryFlush::Config::Config(uint64_t maxGlobalMemory_in,
MemoryFlush::MemoryFlush(const Config &config,
fastos::TimeStamp startTime)
: _lock(),
- _globalMaxMemory(config.maxGlobalMemory),
- _maxGlobalTlsSize(config.maxGlobalTlsSize),
- _globalDiskBloatFactor(config.globalDiskBloatFactor),
- _maxMemoryGain(config.maxMemoryGain),
- _diskBloatFactor(config.diskBloatFactor),
- _maxSerialGain(config.maxSerialGain),
- _maxTimeGain(config.maxTimeGain),
+ _config(config),
_startTime(startTime)
{
}
@@ -101,6 +95,18 @@ MemoryFlush::MemoryFlush()
// empty
}
+MemoryFlush::Config MemoryFlush::getConfig() const
+{
+ vespalib::LockGuard guard(_lock);
+ return _config;
+}
+
+void MemoryFlush::setConfig(const Config &config)
+{
+ vespalib::LockGuard guard(_lock);
+ _config = config;
+}
+
FlushContext::List
MemoryFlush::getFlushTargets(const FlushContext::List &targetList,
const flushengine::TlsStatsMap &
@@ -110,13 +116,16 @@ MemoryFlush::getFlushTargets(const FlushContext::List &targetList,
uint64_t totalMemory(0);
IFlushTarget::DiskGain totalDisk;
uint64_t totalTlsSize(0);
+ const Config config(getConfig());
vespalib::hash_set<const void *> visitedHandlers;
fastos::TimeStamp now(fastos::ClockSystem::now());
LOG(debug,
"getFlushTargets(): globalMaxMemory(%" PRIu64 "), globalDiskBloatFactor(%f), "
"maxMemoryGain(%" PRIu64 "), diskBloatFactor(%f), maxSerialGain(%" PRIu64 "), maxTimeGain(%f), startTime(%f)",
- _globalMaxMemory, _globalDiskBloatFactor, _maxMemoryGain, _diskBloatFactor,
- _maxSerialGain, _maxTimeGain.sec(), _startTime.sec());
+ config.maxGlobalMemory, config.globalDiskBloatFactor,
+ config.maxMemoryGain, config.diskBloatFactor,
+ config.maxSerialGain, config.maxTimeGain.sec(),
+ _startTime.sec());
for (size_t i(0), m(targetList.size()); i < m; i++) {
const IFlushTarget & target(*targetList[i]->getTarget());
const IFlushHandler & handler(*targetList[i]->getHandler());
@@ -133,26 +142,26 @@ MemoryFlush::getFlushTargets(const FlushContext::List &targetList,
tlsStatsMap.getTlsStats(handler.getName());
if (visitedHandlers.insert(&handler).second) {
totalTlsSize += tlsStats.getNumBytes();
- if ((totalTlsSize > _maxGlobalTlsSize) && (order < TLSSIZE)) {
+ if ((totalTlsSize > config.maxGlobalTlsSize) && (order < TLSSIZE)) {
order = TLSSIZE;
}
}
- if (((totalMemory >= _globalMaxMemory) ||
- (mgain >= _maxMemoryGain)) && (order < MEMORY)) {
+ if (((totalMemory >= config.maxGlobalMemory) ||
+ (mgain >= config.maxMemoryGain)) && (order < MEMORY)) {
order = MEMORY;
} else if (((totalDisk.gain() >
- _globalDiskBloatFactor * std::max(100000000l,
+ config.globalDiskBloatFactor * std::max(100000000l,
std::max(totalDisk.getBefore(),
totalDisk.getAfter())))
- || dgain.gain() > _diskBloatFactor *
+ || dgain.gain() > config.diskBloatFactor *
std::max(10000000l,
std::max(dgain.getBefore(), dgain.getAfter())))
&& (order < DISKBLOAT)
) {
order = DISKBLOAT;
- } else if ((serialDiff >= _maxSerialGain) && (order < MAXSERIAL)) {
+ } else if ((serialDiff >= config.maxSerialGain) && (order < MAXSERIAL)) {
order = MAXSERIAL;
- } else if ((timeDiff >= _maxTimeGain) && (order < MAXAGE)) {
+ } else if ((timeDiff >= config.maxTimeGain) && (order < MAXAGE)) {
order = MAXAGE;
}
LOG(debug,
diff --git a/searchcore/src/vespa/searchcore/proton/server/memoryflush.h b/searchcore/src/vespa/searchcore/proton/server/memoryflush.h
index cb0c84b6364..165ba4de70e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/memoryflush.h
+++ b/searchcore/src/vespa/searchcore/proton/server/memoryflush.h
@@ -15,12 +15,21 @@ class MemoryFlush : public boost::noncopyable,
public:
struct Config
{
+ /// Global maxMemory
uint64_t maxGlobalMemory;
+ /// Maximum global tls size.
uint64_t maxGlobalTlsSize;
+ /// Maximum global disk bloat factor. When this limit is reached
+ /// flush is forced.
double globalDiskBloatFactor;
+ /// Maximum memory saved. When this limit is reached flush is forced.
int64_t maxMemoryGain;
+ /// Maximum disk bloat factor. When this limit is reached
+ /// flush is forced.
double diskBloatFactor;
+ /// Maximum count of what a target can have outstanding in the TLS.
int64_t maxSerialGain;
+ /// Maximum age of unflushed data.
fastos::TimeStamp maxTimeGain;
Config();
Config(uint64_t maxGlobalMemory_in,
@@ -35,21 +44,7 @@ public:
private:
/// Needed as flushDone is called in different context from the rest
vespalib::Lock _lock;
- /// Global maxMemory
- uint64_t _globalMaxMemory;
- /// Maximum global tls size.
- uint64_t _maxGlobalTlsSize;
- /// Maximum global disk bloat factor. When this limit is reached
- /// flush is forced.
- double _globalDiskBloatFactor;
- /// Maximum memory saved. When this limit is reached flush is forced.
- int64_t _maxMemoryGain;
- /// Maximum disk bloat factor. When this limit is reached flush is forced.
- double _diskBloatFactor;
- /// Maximum count of what a target can have outstanding in the TLS.
- int64_t _maxSerialGain;
- /// Maximum age of unflushed data.
- fastos::TimeStamp _maxTimeGain;
+ Config _config;
/// The time when the strategy was started.
fastos::TimeStamp _startTime;
@@ -73,6 +68,8 @@ private:
const flushengine::TlsStatsMap &_tlsStatsMap;
};
+ Config getConfig() const;
+
public:
MemoryFlush();
@@ -84,6 +81,8 @@ public:
getFlushTargets(const FlushContext::List &targetList,
const flushengine::TlsStatsMap &
tlsStatsMap) const override;
+
+ void setConfig(const Config &config);
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 60c54bf1ef3..ef66302b181 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -89,10 +89,41 @@ diskMemUsageSamplerConfig(const ProtonConfig &proton)
proton.writefilter.sampleinterval);
}
+static constexpr size_t TOTAL_HARD_MEMORY_LIMIT=16*1024*1024*1024ul;
+static constexpr size_t EACH_HARD_MEMORY_LIMIT=12*1024*1024*1024ul;
+
+MemoryFlush::Config
+memoryFlushConfig(const ProtonConfig::Flush &flush)
+{
+ size_t totalMaxMemory = flush.memory.maxmemory;
+ if (totalMaxMemory > TOTAL_HARD_MEMORY_LIMIT) {
+ LOG(warning, "flush.memory.maxmemory=%ld can not"
+ " be set above the hard limit of %ld so we cap it",
+ flush.memory.maxmemory,
+ TOTAL_HARD_MEMORY_LIMIT);
+ totalMaxMemory = TOTAL_HARD_MEMORY_LIMIT;
+ }
+ size_t eachMaxMemory = flush.memory.each.maxmemory;
+ if (eachMaxMemory > EACH_HARD_MEMORY_LIMIT) {
+ LOG(warning, "flush.memory.each.maxmemory=%ld can not"
+ " be set above the hard limit of %ld so we cap it",
+ flush.memory.maxmemory,
+ EACH_HARD_MEMORY_LIMIT);
+ eachMaxMemory = EACH_HARD_MEMORY_LIMIT;
+ }
+ return MemoryFlush::Config(totalMaxMemory,
+ flush.memory.maxtlssize,
+ flush.memory.diskbloatfactor,
+ eachMaxMemory,
+ flush.memory.each.diskbloatfactor,
+ flush.memory.maxage.serial,
+ static_cast<long>
+ (flush.memory.maxage.time) *
+ fastos::TimeStamp::NANO);
+}
+
}
-static const size_t TOTAL_HARD_MEMORY_LIMIT=16*1024*1024*1024ul;
-static const size_t EACH_HARD_MEMORY_LIMIT=12*1024*1024*1024ul;
static const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component";
Proton::ProtonFileHeaderContext::ProtonFileHeaderContext(const Proton &proton_,
@@ -262,34 +293,9 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
IFlushStrategy::SP strategy;
const ProtonConfig::Flush & flush(protonConfig.flush);
switch (flush.strategy) {
- case ProtonConfig::Flush::MEMORY: {
- size_t totalMaxMemory = flush.memory.maxmemory;
- if (totalMaxMemory > TOTAL_HARD_MEMORY_LIMIT) {
- LOG(warning, "flush.memory.maxmemory=%ld can not"
- " be set above the hard limit of %ld so we cap it",
- flush.memory.maxmemory,
- TOTAL_HARD_MEMORY_LIMIT);
- totalMaxMemory = TOTAL_HARD_MEMORY_LIMIT;
- }
- size_t eachMaxMemory = flush.memory.each.maxmemory;
- if (eachMaxMemory > EACH_HARD_MEMORY_LIMIT) {
- LOG(warning, "flush.memory.each.maxmemory=%ld can not"
- " be set above the hard limit of %ld so we cap it",
- flush.memory.maxmemory,
- EACH_HARD_MEMORY_LIMIT);
- eachMaxMemory = EACH_HARD_MEMORY_LIMIT;
- }
+ case ProtonConfig::Flush::MEMORY:
strategy = std::make_shared<MemoryFlush>(
- MemoryFlush::Config(totalMaxMemory,
- flush.memory.maxtlssize,
- flush.memory.diskbloatfactor,
- eachMaxMemory,
- flush.memory.each.diskbloatfactor,
- flush.memory.maxage.serial,
- static_cast<long>
- (flush.memory.maxage.time) *
- fastos::TimeStamp::NANO));
- }
+ memoryFlushConfig(flush));
break;
case ProtonConfig::Flush::SIMPLE:
default:
@@ -299,6 +305,7 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
vespalib::mkdir(protonConfig.basedir + "/documents", true);
vespalib::chdir(protonConfig.basedir);
_tls->start();
+ _strategy = strategy;
_flushEngine.reset(new FlushEngine(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()),
strategy, flush.maxconcurrent, flush.idleinterval*1000, true));
_fs4Server.reset(new TransportServer(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL));
@@ -538,6 +545,12 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot,
configSnapshot->getGeneration()));
_diskMemUsageSampler->
setConfig(diskMemUsageSamplerConfig(protonConfig));
+ std::shared_ptr<MemoryFlush> memoryFlushStrategy =
+ std::dynamic_pointer_cast<MemoryFlush>(_strategy);
+ if (memoryFlushStrategy) {
+ memoryFlushStrategy->setConfig(memoryFlushConfig(protonConfig.flush));
+ _flushEngine->kick();
+ }
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index 023a6173984..be8e602fa2a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -106,6 +106,7 @@ private:
MatchEngine::UP _matchEngine;
SummaryEngine::UP _summaryEngine;
DocsumBySlime::UP _docsumBySlime;
+ IFlushStrategy::SP _strategy;
FlushEngine::UP _flushEngine;
RPCHooks::UP _rpcHooks;
HealthAdapter _healthAdapter;