summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahoo-inc.com>2016-06-29 14:13:44 +0200
committerGeir Storli <geirst@yahoo-inc.com>2016-06-30 11:18:42 +0200
commitcf51b776825f9f70360bc264a8ac3e1f42dee982 (patch)
tree0ba8d0b9015ba854e927a0b1c2bd5aa23993de7a /searchcore
parentd33d88da5a845395273046ff34bee05c6cbbafd2 (diff)
Add class that listens to changes in disk/memory usage and updates config used by memory flush strategy.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/server/memory_flush_config_updater/CMakeLists.txt9
-rw-r--r--searchcore/src/tests/proton/server/memory_flush_config_updater/DESC1
-rw-r--r--searchcore/src/tests/proton/server/memory_flush_config_updater/FILES1
-rw-r--r--searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp85
-rw-r--r--searchcore/src/vespa/searchcore/config/proton.def16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp84
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.h41
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/memoryflush.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.cpp55
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton.h3
12 files changed, 256 insertions, 46 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index 3d72d70af48..46b6666639e 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -121,6 +121,7 @@ vespa_define_module(
src/tests/proton/server/data_directory_upgrader
src/tests/proton/server/disk_mem_usage_filter
src/tests/proton/server/health_adapter
+ src/tests/proton/server/memory_flush_config_updater
src/tests/proton/server/memoryflush
src/tests/proton/server/visibility_handler
src/tests/proton/statusreport
diff --git a/searchcore/src/tests/proton/server/memory_flush_config_updater/CMakeLists.txt b/searchcore/src/tests/proton/server/memory_flush_config_updater/CMakeLists.txt
new file mode 100644
index 00000000000..5730e9181a5
--- /dev/null
+++ b/searchcore/src/tests/proton/server/memory_flush_config_updater/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcore_memory_flush_config_updater_test_app TEST
+ SOURCES
+ memory_flush_config_updater_test.cpp
+ DEPENDS
+ searchcore_server
+ searchcore_flushengine
+)
+vespa_add_test(NAME searchcore_memory_flush_config_updater_test_app COMMAND searchcore_memory_flush_config_updater_test_app)
diff --git a/searchcore/src/tests/proton/server/memory_flush_config_updater/DESC b/searchcore/src/tests/proton/server/memory_flush_config_updater/DESC
new file mode 100644
index 00000000000..b183c571bcc
--- /dev/null
+++ b/searchcore/src/tests/proton/server/memory_flush_config_updater/DESC
@@ -0,0 +1 @@
+memory_flush_config_updater test. Take a look at memory_flush_config_updater_test.cpp for details.
diff --git a/searchcore/src/tests/proton/server/memory_flush_config_updater/FILES b/searchcore/src/tests/proton/server/memory_flush_config_updater/FILES
new file mode 100644
index 00000000000..c36fcd27415
--- /dev/null
+++ b/searchcore/src/tests/proton/server/memory_flush_config_updater/FILES
@@ -0,0 +1 @@
+memory_flush_config_updater_test.cpp
diff --git a/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp b/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp
new file mode 100644
index 00000000000..49d1016848e
--- /dev/null
+++ b/searchcore/src/tests/proton/server/memory_flush_config_updater/memory_flush_config_updater_test.cpp
@@ -0,0 +1,85 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/vespalib/testkit/testapp.h>
+
+#include <vespa/searchcore/proton/server/memory_flush_config_updater.h>
+
+using namespace proton;
+using vespa::config::search::core::ProtonConfig;
+
+ProtonConfig::Flush::Memory
+getConfig(int64_t maxMemory, int64_t conservativeMaxMemory, int64_t maxTlsSize, int64_t conservativeMaxTlsSize)
+{
+ ProtonConfig::Flush::Memory result;
+ result.maxmemory = maxMemory;
+ result.maxtlssize = maxTlsSize;
+ result.conservative.maxmemory = conservativeMaxMemory;
+ result.conservative.maxtlssize = conservativeMaxTlsSize;
+ return result;
+}
+
+ProtonConfig::Flush::Memory
+getDefaultConfig()
+{
+ return getConfig(4, 2, 20, 10);
+}
+
+struct Fixture
+{
+ MemoryFlush::SP strategy;
+ MemoryFlushConfigUpdater updater;
+ Fixture()
+ : strategy(std::make_shared<MemoryFlush>(MemoryFlushConfigUpdater::convertConfig(getDefaultConfig()))),
+ updater(strategy, getDefaultConfig())
+ {}
+ void assertStrategyConfig(int64_t expMaxGlobalMemory, int64_t expMaxGlobalTlsSize) {
+ EXPECT_EQUAL(expMaxGlobalMemory, strategy->getConfig().maxGlobalMemory);
+ EXPECT_EQUAL(expMaxGlobalTlsSize, strategy->getConfig().maxGlobalTlsSize);
+ }
+};
+
+TEST_F("require that strategy is updated when setting new config", Fixture)
+{
+ f.updater.setConfig(getConfig(5, 3, 30, 15));
+ TEST_DO(f.assertStrategyConfig(5, 30));
+}
+
+TEST_F("require that strategy is updated with normal values if no limits are reached", Fixture)
+{
+ f.updater.notifyDiskMemUsage(DiskMemUsageState(false, false));
+ TEST_DO(f.assertStrategyConfig(4, 20));
+}
+
+TEST_F("require that strategy is updated with conservative max tls size value if disk limit is reached", Fixture)
+{
+ f.updater.notifyDiskMemUsage(DiskMemUsageState(true, false));
+ TEST_DO(f.assertStrategyConfig(4, 10));
+}
+
+TEST_F("require that strategy is updated with conservative max memory value if memory limit is reached", Fixture)
+{
+ f.updater.notifyDiskMemUsage(DiskMemUsageState(false, true));
+ TEST_DO(f.assertStrategyConfig(2, 20));
+}
+
+TEST_F("require that strategy is updated with all conservative values if both limits are reached", Fixture)
+{
+ f.updater.notifyDiskMemUsage(DiskMemUsageState(true, true));
+ TEST_DO(f.assertStrategyConfig(2, 10));
+}
+
+TEST_F("require that last disk/memory usage state is remembered when setting new config", Fixture)
+{
+ f.updater.notifyDiskMemUsage(DiskMemUsageState(true, false));
+ f.updater.setConfig(getConfig(5, 3, 30, 15));
+ TEST_DO(f.assertStrategyConfig(5, 15));
+}
+
+TEST_F("require that last config if remembered when setting new disk/memory usage state", Fixture)
+{
+ f.updater.setConfig(getConfig(5, 3, 30, 15));
+ f.updater.notifyDiskMemUsage(DiskMemUsageState(true, false));
+ TEST_DO(f.assertStrategyConfig(5, 15));
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def
index 54bb54d5424..de661a634f6 100644
--- a/searchcore/src/vespa/searchcore/config/proton.def
+++ b/searchcore/src/vespa/searchcore/config/proton.def
@@ -56,13 +56,15 @@ flush.idleinterval double default=10.0 restart
## Which flushstrategy to use.
flush.strategy enum {SIMPLE, MEMORY} default=MEMORY restart
-## Total number of bytes allowed before forcing flush.
+## The total maximum memory (in bytes) used by FLUSH components before running flush.
+## A FLUSH component will free memory when flushed (e.g. memory index).
flush.memory.maxmemory long default=4294967296
## Maximum total disk bloat factor before forcing flush.
flush.memory.diskbloatfactor double default=0.2
-## Max disk usage (in bytes) for all transaction logs before forcing flush.
+## Max disk usage (in bytes) for all transaction logs before running flush.
+## In this case the oldest component is flushed such that transaction log can be pruned and disk freed.
flush.memory.maxtlssize long default=21474836480
## Number of bytes allowed per component before forcing memory prioritization.
@@ -78,6 +80,16 @@ flush.memory.maxage.time double default=86400.0
## Max diff in serial number allowed before that takes precedence.
flush.memory.maxage.serial long default=1000000
+## When resource limit for memory is reached we choose this conservative mode for the flush strategy.
+## The total maximum memory (in bytes) used by FLUSH components before running flush.
+## A FLUSH component will free memory when flushed (e.g. memory index).
+flush.memory.conservative.maxmemory long default=2147483648
+
+## When resource limit for disk is reached we choose this conservative mode for the flush strategy.
+## The max disk usage (int bytes) for all transaction logs before running flush.
+## In this case the oldest component is flushed such that transaction log can be pruned and disk freed.
+flush.memory.conservative.maxtlssize long default=10737418240
+
## The cost of doing replay when replaying the transaction log.
##
## The number of bytes to replay * replaycost gives an estimate of the
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index cc7ce7e8138..dbcdd8e2039 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -60,6 +60,7 @@ vespa_add_library(searchcore_server STATIC
matchers.cpp
matchhandlerproxy.cpp
matchview.cpp
+ memory_flush_config_updater.cpp
memoryflush.cpp
minimal_document_retriever.cpp
ooscli.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp b/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp
new file mode 100644
index 00000000000..45393d27c37
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.cpp
@@ -0,0 +1,84 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".proton.server.memory_flush_config_updater");
+#include "memory_flush_config_updater.h"
+
+namespace proton {
+
+void
+MemoryFlushConfigUpdater::updateFlushStrategy(const LockGuard &)
+{
+ MemoryFlush::Config newConfig = convertConfig(_currConfig);
+ if (_currState.aboveDiskLimit()) {
+ newConfig.maxGlobalTlsSize = _currConfig.conservative.maxtlssize;
+ }
+ if (_currState.aboveMemoryLimit()) {
+ newConfig.maxGlobalMemory = _currConfig.conservative.maxmemory;
+ }
+ _flushStrategy->setConfig(newConfig);
+}
+
+MemoryFlushConfigUpdater::MemoryFlushConfigUpdater(const MemoryFlush::SP &flushStrategy,
+ const ProtonConfig::Flush::Memory &config)
+ : _flushStrategy(flushStrategy),
+ _currConfig(config),
+ _currState()
+{
+}
+
+void
+MemoryFlushConfigUpdater::setConfig(const ProtonConfig::Flush::Memory &newConfig)
+{
+ LockGuard guard(_mutex);
+ _currConfig = newConfig;
+ updateFlushStrategy(guard);
+}
+
+void
+MemoryFlushConfigUpdater::notifyDiskMemUsage(DiskMemUsageState newState)
+{
+ LockGuard guard(_mutex);
+ _currState = newState;
+ updateFlushStrategy(guard);
+}
+
+namespace {
+
+static constexpr size_t TOTAL_HARD_MEMORY_LIMIT = 16 * 1024 * 1024 * 1024ul;
+static constexpr size_t EACH_HARD_MEMORY_LIMIT = 12 * 1024 * 1024 * 1024ul;
+
+}
+
+MemoryFlush::Config
+MemoryFlushConfigUpdater::convertConfig(const ProtonConfig::Flush::Memory &config)
+{
+ size_t totalMaxMemory = config.maxmemory;
+ if (totalMaxMemory > TOTAL_HARD_MEMORY_LIMIT) {
+ LOG(warning, "flush.memory.maxmemory=%ld cannot"
+ " be set above the hard limit of %ld (16GB) so we cap it to the hard limit",
+ config.maxmemory,
+ TOTAL_HARD_MEMORY_LIMIT);
+ totalMaxMemory = TOTAL_HARD_MEMORY_LIMIT;
+ }
+ size_t eachMaxMemory = config.each.maxmemory;
+ if (eachMaxMemory > EACH_HARD_MEMORY_LIMIT) {
+ LOG(warning, "flush.memory.each.maxmemory=%ld cannot"
+ " be set above the hard limit of %ld (12GB) so we cap it to the hard limit",
+ config.maxmemory,
+ EACH_HARD_MEMORY_LIMIT);
+ eachMaxMemory = EACH_HARD_MEMORY_LIMIT;
+ }
+ return MemoryFlush::Config(totalMaxMemory,
+ config.maxtlssize,
+ config.diskbloatfactor,
+ eachMaxMemory,
+ config.each.diskbloatfactor,
+ config.maxage.serial,
+ static_cast<long>
+ (config.maxage.time) *
+ fastos::TimeStamp::NANO);
+}
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.h b/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.h
new file mode 100644
index 00000000000..be5fa704de9
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/memory_flush_config_updater.h
@@ -0,0 +1,41 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_disk_mem_usage_listener.h"
+#include "memoryflush.h"
+#include <vespa/searchcore/config/config-proton.h>
+#include <mutex>
+
+namespace proton {
+
+/**
+ * Class that listens to changes in disk and memory usage and
+ * updates the config used by memory flush strategy accordingly if we reach one of the resource limits.
+ */
+class MemoryFlushConfigUpdater : public IDiskMemUsageListener
+{
+private:
+ using Mutex = std::mutex;
+ using LockGuard = std::lock_guard<Mutex>;
+ using ProtonConfig = vespa::config::search::core::ProtonConfig;
+
+ MemoryFlush::SP _flushStrategy;
+ ProtonConfig::Flush::Memory _currConfig;
+ DiskMemUsageState _currState;
+ Mutex _mutex;
+
+ void updateFlushStrategy(const LockGuard &);
+
+public:
+ using UP = std::unique_ptr<MemoryFlushConfigUpdater>;
+
+ MemoryFlushConfigUpdater(const MemoryFlush::SP &flushStrategy,
+ const ProtonConfig::Flush::Memory &config);
+ void setConfig(const ProtonConfig::Flush::Memory &newConfig);
+ virtual void notifyDiskMemUsage(DiskMemUsageState newState) override;
+
+ static MemoryFlush::Config convertConfig(const ProtonConfig::Flush::Memory &config);
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/memoryflush.h b/searchcore/src/vespa/searchcore/proton/server/memoryflush.h
index 165ba4de70e..dc48b4718ed 100644
--- a/searchcore/src/vespa/searchcore/proton/server/memoryflush.h
+++ b/searchcore/src/vespa/searchcore/proton/server/memoryflush.h
@@ -68,9 +68,9 @@ private:
const flushengine::TlsStatsMap &_tlsStatsMap;
};
- Config getConfig() const;
-
public:
+ using SP = std::shared_ptr<MemoryFlush>;
+
MemoryFlush();
MemoryFlush(const Config &config,
@@ -83,6 +83,7 @@ public:
tlsStatsMap) const override;
void setConfig(const Config &config);
+ Config getConfig() const;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
index 424029ba8a4..f576e53e96b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp
@@ -89,39 +89,6 @@ diskMemUsageSamplerConfig(const ProtonConfig &proton)
proton.writefilter.sampleinterval);
}
-static constexpr size_t TOTAL_HARD_MEMORY_LIMIT=16*1024*1024*1024ul;
-static constexpr size_t EACH_HARD_MEMORY_LIMIT=12*1024*1024*1024ul;
-
-MemoryFlush::Config
-memoryFlushConfig(const ProtonConfig::Flush &flush)
-{
- size_t totalMaxMemory = flush.memory.maxmemory;
- if (totalMaxMemory > TOTAL_HARD_MEMORY_LIMIT) {
- LOG(warning, "flush.memory.maxmemory=%ld can not"
- " be set above the hard limit of %ld so we cap it",
- flush.memory.maxmemory,
- TOTAL_HARD_MEMORY_LIMIT);
- totalMaxMemory = TOTAL_HARD_MEMORY_LIMIT;
- }
- size_t eachMaxMemory = flush.memory.each.maxmemory;
- if (eachMaxMemory > EACH_HARD_MEMORY_LIMIT) {
- LOG(warning, "flush.memory.each.maxmemory=%ld can not"
- " be set above the hard limit of %ld so we cap it",
- flush.memory.maxmemory,
- EACH_HARD_MEMORY_LIMIT);
- eachMaxMemory = EACH_HARD_MEMORY_LIMIT;
- }
- return MemoryFlush::Config(totalMaxMemory,
- flush.memory.maxtlssize,
- flush.memory.diskbloatfactor,
- eachMaxMemory,
- flush.memory.each.diskbloatfactor,
- flush.memory.maxage.serial,
- static_cast<long>
- (flush.memory.maxage.time) *
- fastos::TimeStamp::NANO);
-}
-
}
static const vespalib::string CUSTOM_COMPONENT_API_PATH = "/state/v1/custom/component";
@@ -207,6 +174,7 @@ Proton::Proton(const config::ConfigUri & configUri,
_matchEngine(),
_summaryEngine(),
_docsumBySlime(),
+ _memoryFlushConfigUpdater(),
_flushEngine(),
_rpcHooks(),
_healthAdapter(*this),
@@ -293,10 +261,14 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
IFlushStrategy::SP strategy;
const ProtonConfig::Flush & flush(protonConfig.flush);
switch (flush.strategy) {
- case ProtonConfig::Flush::MEMORY:
- strategy = std::make_shared<MemoryFlush>(
- memoryFlushConfig(flush));
+ case ProtonConfig::Flush::MEMORY: {
+ MemoryFlush::SP memoryFlush = std::make_shared<MemoryFlush>(
+ MemoryFlushConfigUpdater::convertConfig(flush.memory));
+ _memoryFlushConfigUpdater = std::make_unique<MemoryFlushConfigUpdater>(memoryFlush, flush.memory);
+ _diskMemUsageSampler->notifier().addDiskMemUsageListener(_memoryFlushConfigUpdater.get());
+ strategy = memoryFlush;
break;
+ }
case ProtonConfig::Flush::SIMPLE:
default:
strategy.reset(new SimpleFlush());
@@ -305,7 +277,6 @@ Proton::init(const BootstrapConfig::SP & configSnapshot)
vespalib::mkdir(protonConfig.basedir + "/documents", true);
vespalib::chdir(protonConfig.basedir);
_tls->start();
- _strategy = strategy;
_flushEngine.reset(new FlushEngine(std::make_shared<flushengine::TlsStatsFactory>(_tls->getTransLogServer()),
strategy, flush.maxconcurrent, flush.idleinterval*1000));
_fs4Server.reset(new TransportServer(*_matchEngine, *_summaryEngine, *this, protonConfig.ptport, TransportServer::DEBUG_ALL));
@@ -545,10 +516,8 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot,
configSnapshot->getGeneration()));
_diskMemUsageSampler->
setConfig(diskMemUsageSamplerConfig(protonConfig));
- std::shared_ptr<MemoryFlush> memoryFlushStrategy =
- std::dynamic_pointer_cast<MemoryFlush>(_strategy);
- if (memoryFlushStrategy) {
- memoryFlushStrategy->setConfig(memoryFlushConfig(protonConfig.flush));
+ if (_memoryFlushConfigUpdater) {
+ _memoryFlushConfigUpdater->setConfig(protonConfig.flush.memory);
_flushEngine->kick();
}
}
@@ -570,6 +539,7 @@ Proton::addDocumentDB(const DocTypeName & docTypeName,
configId.c_str());
addDocumentDB(*docType, configSnapshot, initializeThreads);
} else {
+
LOG(warning,
"Did not find document type '%s' in the document manager. "
"Skipping creating document database for this type",
@@ -617,6 +587,9 @@ Proton::~Proton()
if (_rpcHooks.get() != NULL) {
_rpcHooks->close();
}
+ if (_memoryFlushConfigUpdater) {
+ _diskMemUsageSampler->notifier().removeDiskMemUsageListener(_memoryFlushConfigUpdater.get());
+ }
_executor.shutdown();
_executor.sync();
_rpcHooks.reset();
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h
index be8e602fa2a..c9e70e60d09 100644
--- a/searchcore/src/vespa/searchcore/proton/server/proton.h
+++ b/searchcore/src/vespa/searchcore/proton/server/proton.h
@@ -12,6 +12,7 @@
#include <vespa/searchcore/proton/persistenceengine/persistenceengine.h>
#include <vespa/searchcore/proton/server/bootstrapconfigmanager.h>
#include <vespa/searchcore/proton/server/documentdb.h>
+#include <vespa/searchcore/proton/server/memory_flush_config_updater.h>
#include <vespa/searchcore/proton/server/protonconfigurer.h>
#include <vespa/searchcore/proton/server/rpc_hooks.h>
#include <vespa/searchcore/proton/summaryengine/summaryengine.h>
@@ -106,7 +107,7 @@ private:
MatchEngine::UP _matchEngine;
SummaryEngine::UP _summaryEngine;
DocsumBySlime::UP _docsumBySlime;
- IFlushStrategy::SP _strategy;
+ MemoryFlushConfigUpdater::UP _memoryFlushConfigUpdater;
FlushEngine::UP _flushEngine;
RPCHooks::UP _rpcHooks;
HealthAdapter _healthAdapter;