diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storageframework |
Publish
Diffstat (limited to 'storageframework')
103 files changed, 5877 insertions, 0 deletions
diff --git a/storageframework/.gitignore b/storageframework/.gitignore new file mode 100644 index 00000000000..a9b20e8992d --- /dev/null +++ b/storageframework/.gitignore @@ -0,0 +1,2 @@ +Makefile +Testing diff --git a/storageframework/CMakeLists.txt b/storageframework/CMakeLists.txt new file mode 100644 index 00000000000..706e81cb2de --- /dev/null +++ b/storageframework/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_define_module( + DEPENDS + fastos + vespalog + vespalib + metrics + + LIBS + src/vespa/storageframework + src/vespa/storageframework/defaultimplementation + src/vespa/storageframework/defaultimplementation/clock + src/vespa/storageframework/defaultimplementation/component + src/vespa/storageframework/defaultimplementation/memory + src/vespa/storageframework/defaultimplementation/thread + src/vespa/storageframework/generic + src/vespa/storageframework/generic/clock + src/vespa/storageframework/generic/component + src/vespa/storageframework/generic/memory + src/vespa/storageframework/generic/metric + src/vespa/storageframework/generic/status + src/vespa/storageframework/generic/thread + + TEST_EXTERNAL_DEPENDS + cppunit + vdstestlib + + TESTS + src/tests + src/tests/clock + src/tests/memory + src/tests/status + src/tests/thread +) diff --git a/storageframework/OWNERS b/storageframework/OWNERS new file mode 100644 index 00000000000..97c35339850 --- /dev/null +++ b/storageframework/OWNERS @@ -0,0 +1,2 @@ +vekterli +dybdahl diff --git a/storageframework/README b/storageframework/README new file mode 100644 index 00000000000..b24aedd74f3 --- /dev/null +++ b/storageframework/README @@ -0,0 +1,10 @@ +This module contains a generic application framework for a C++ server +process. It provides functionality such as metrics, status pages, memory +management and deadlock detection. + +This used to be coded into the storage module itself, but we want the generic +stuff to be available to others. The driver for moving is that storage now +splits its own memfile persistence layer out of storage, and we want both parts +to have the same utilities available. + +This module should hopefully not contain very storage specific stuff. diff --git a/storageframework/src/.gitignore b/storageframework/src/.gitignore new file mode 100644 index 00000000000..57b38cc4f9d --- /dev/null +++ b/storageframework/src/.gitignore @@ -0,0 +1,4 @@ +/Makefile.ini +/config_command.sh +/project.dsw +/storageframework.mak diff --git a/storageframework/src/tests/.gitignore b/storageframework/src/tests/.gitignore new file mode 100644 index 00000000000..9124b7b5751 --- /dev/null +++ b/storageframework/src/tests/.gitignore @@ -0,0 +1,5 @@ +/.depend +/Makefile +/test.vlog +/testrunner +storageframework_testrunner_app diff --git a/storageframework/src/tests/CMakeLists.txt b/storageframework/src/tests/CMakeLists.txt new file mode 100644 index 00000000000..863d5381d20 --- /dev/null +++ b/storageframework/src/tests/CMakeLists.txt @@ -0,0 +1,12 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(storageframework_testrunner_app + SOURCES + testrunner.cpp + DEPENDS + storageframework_testclock + storageframework_teststatus + storageframework_testmemory + storageframework_testthread + storageframework +) +vespa_add_test(NAME storageframework_testrunner_app COMMAND storageframework_testrunner_app) diff --git a/storageframework/src/tests/clock/.gitignore b/storageframework/src/tests/clock/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/tests/clock/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/tests/clock/CMakeLists.txt b/storageframework/src/tests/clock/CMakeLists.txt new file mode 100644 index 00000000000..1134cf8e88a --- /dev/null +++ b/storageframework/src/tests/clock/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_testclock + SOURCES + timetest.cpp + DEPENDS +) diff --git a/storageframework/src/tests/clock/timetest.cpp b/storageframework/src/tests/clock/timetest.cpp new file mode 100644 index 00000000000..c7319667ea8 --- /dev/null +++ b/storageframework/src/tests/clock/timetest.cpp @@ -0,0 +1,85 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct TimeTest : public CppUnit::TestFixture +{ + void setUp() {} + void tearDown() {} + + void testBasics(); + void testCreatedFromClock(); + void canAssignMicrosecondResolutionTimeToFakeClock(); + + CPPUNIT_TEST_SUITE(TimeTest); + CPPUNIT_TEST(testBasics); // Fails sometimes, test needs rewrite. + CPPUNIT_TEST(testCreatedFromClock); + CPPUNIT_TEST(canAssignMicrosecondResolutionTimeToFakeClock); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(TimeTest); + +void +TimeTest::testBasics() +{ + SecondTime timeSec(1); + + MilliSecTime timeMillis = timeSec.getMillis(); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), timeMillis.getTime()); + CPPUNIT_ASSERT_EQUAL(timeSec, timeMillis.getSeconds()); + + MicroSecTime timeMicros = timeSec.getMicros(); + CPPUNIT_ASSERT_EQUAL(timeSec.getMicros(), timeMillis.getMicros()); + CPPUNIT_ASSERT_EQUAL(timeMillis, timeMicros.getMillis()); + CPPUNIT_ASSERT_EQUAL(timeSec, timeMicros.getSeconds()); + + MicroSecTime timeMicros2 = timeMicros; + CPPUNIT_ASSERT(timeMicros2 == timeMicros); + timeMicros2 += MicroSecTime(25000); + CPPUNIT_ASSERT(timeMicros2 > timeMicros); + CPPUNIT_ASSERT(timeMicros < timeMicros2); + timeMicros2 -= MicroSecTime(30000); + CPPUNIT_ASSERT(timeMicros2 < timeMicros); + CPPUNIT_ASSERT(timeMicros > timeMicros2); + timeMicros2 += MicroSecTime(55000); + + MilliSecTime timeMillis2 = timeMicros2.getMillis(); + CPPUNIT_ASSERT(timeMillis2 > timeMillis); + CPPUNIT_ASSERT_EQUAL(uint64_t(1050), timeMillis2.getTime()); + CPPUNIT_ASSERT_EQUAL(timeSec, timeMillis2.getSeconds()); +} + +void +TimeTest::testCreatedFromClock() +{ + defaultimplementation::FakeClock clock; + clock.setAbsoluteTimeInSeconds(600); + + CPPUNIT_ASSERT_EQUAL(SecondTime(600), SecondTime(clock)); + CPPUNIT_ASSERT_EQUAL(MilliSecTime(600 * 1000), MilliSecTime(clock)); + CPPUNIT_ASSERT_EQUAL(MicroSecTime(600 * 1000 * 1000), MicroSecTime(clock)); +} + +void +TimeTest::canAssignMicrosecondResolutionTimeToFakeClock() +{ + defaultimplementation::FakeClock clock; + clock.setAbsoluteTimeInMicroSeconds(1234567); // 1.234567 seconds + + // All non-microsec time points must necessarily be truncated. + CPPUNIT_ASSERT_EQUAL(SecondTime(1), SecondTime(clock)); + CPPUNIT_ASSERT_EQUAL(MilliSecTime(1234), MilliSecTime(clock)); + CPPUNIT_ASSERT_EQUAL(MicroSecTime(1234567), MicroSecTime(clock)); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/tests/memory/.gitignore b/storageframework/src/tests/memory/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/tests/memory/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/tests/memory/CMakeLists.txt b/storageframework/src/tests/memory/CMakeLists.txt new file mode 100644 index 00000000000..97a7314f4cb --- /dev/null +++ b/storageframework/src/tests/memory/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_testmemory + SOURCES + memorymanagertest.cpp + memorystatetest.cpp + DEPENDS +) diff --git a/storageframework/src/tests/memory/memorymanagertest.cpp b/storageframework/src/tests/memory/memorymanagertest.cpp new file mode 100644 index 00000000000..3675cc55c2d --- /dev/null +++ b/storageframework/src/tests/memory/memorymanagertest.cpp @@ -0,0 +1,400 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <vespa/storageframework/defaultimplementation/memory/memorymanager.h> +#include <vespa/storageframework/defaultimplementation/memory/simplememorylogic.h> +#include <vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/vespalib/util/document_runnable.h> +#include <vespa/vespalib/util/random.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct MemoryManagerTest : public CppUnit::TestFixture +{ + void testBasics(); + void testCacheAllocation(); + void testStress(); + + CPPUNIT_TEST_SUITE(MemoryManagerTest); + CPPUNIT_TEST(testBasics); + CPPUNIT_TEST(testCacheAllocation); + CPPUNIT_TEST(testStress); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(MemoryManagerTest); + +void +MemoryManagerTest::testBasics() +{ + uint64_t maxMemory = 1000; + RealClock clock; + SimpleMemoryLogic* logic = new SimpleMemoryLogic(clock, maxMemory); + AllocationLogic::UP allLogic(std::move(logic)); + MemoryManager manager(std::move(allLogic)); + + const MemoryAllocationType& putAlloc(manager.registerAllocationType( + MemoryAllocationType("put", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& getAlloc(manager.registerAllocationType( + MemoryAllocationType("get", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& bufAlloc(manager.registerAllocationType( + MemoryAllocationType("buffer"))); + const MemoryAllocationType& cacheAlloc(manager.registerAllocationType( + MemoryAllocationType("cache", MemoryAllocationType::CACHE))); + const MemoryState& state(logic->getState()); + const MemoryState::SnapShot& current(state.getCurrentSnapshot()); + // Basics + { + // * Getting a token, and release it back with correct behavior + framework::MemoryToken::UP put = manager.allocate(putAlloc, + 0, 100, 80); + CPPUNIT_ASSERT(put.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(100), put->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(100), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(900), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + + // * Do the same while not being empty. Different type. + framework::MemoryToken::UP get = manager.allocate(getAlloc, + 30, 200, 50); + CPPUNIT_ASSERT(get.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(200), get->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(300), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(700), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + + // * Do the same while not being empty. Same type. + framework::MemoryToken::UP get2 = manager.allocate( + getAlloc, + 70, + 150, + 60); + + CPPUNIT_ASSERT(get2.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(150), get2->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(450), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(550), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // Non-external load + // * Getting minimum when going beyond 80% full + { + framework::MemoryToken::UP filler = manager.allocate(putAlloc, + 795, 795, 90); + framework::MemoryToken::UP resize = manager.allocate( + bufAlloc, 10, 90, 80); + CPPUNIT_ASSERT(resize.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(10), resize->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(805), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(195), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // Non-external load + // * Getting up to threshold if hitting it + { + framework::MemoryToken::UP filler = manager.allocate(putAlloc, + 750, 750, 90); + framework::MemoryToken::UP resize = manager.allocate( + bufAlloc, 10, 90, 80); + CPPUNIT_ASSERT(resize.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(50), resize->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(800), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(200), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // External load + { + // * Stopped when going beyond 80% full + framework::MemoryToken::UP filler = manager.allocate(putAlloc, + 795, 795, 90); + framework::MemoryToken::UP put = manager.allocate(putAlloc, + 10, 100, 80); + CPPUNIT_ASSERT(put.get() == 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(795), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(205), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // External load + { + // * Getting up to threshold if hitting it + framework::MemoryToken::UP filler = manager.allocate(putAlloc, + 750, 750, 90); + framework::MemoryToken::UP put = manager.allocate(putAlloc, + 10, 100, 80); + CPPUNIT_ASSERT(put.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(50), put->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(800), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(200), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // Test caching.. + { + // Cache paradigm: + // Allocate a token taking up no space at all. + // Give it to your ReduceMemoryUsageInterface implementation. + // Run resize on your token in that implementation to get memory and + // return memory. That way locking should be easy when needed. + struct ReduceI : public framework::ReduceMemoryUsageInterface { + framework::MemoryToken::UP _token; + + virtual uint64_t reduceMemoryConsumption(const MemoryToken& token, + uint64_t reduceBy) + { + assert(&token == _token.get()); + (void) &token; + assert(_token->getSize() >= reduceBy); + return reduceBy; + } + }; + ReduceI reducer; + framework::MemoryToken::UP cache = manager.allocate(cacheAlloc, + 0, 0, 0, &reducer); + CPPUNIT_ASSERT(cache.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), cache->getSize()); + reducer._token = std::move(cache); + for (uint32_t i=1; i<=50; ++i) { + bool success = reducer._token->resize(i * 10, i * 10); + CPPUNIT_ASSERT_EQUAL(true, success); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(500), reducer._token->getSize()); + + // * Ordered to free space + framework::MemoryToken::UP put = manager.allocate(putAlloc, + 600, 600, 80); + CPPUNIT_ASSERT_EQUAL_MSG(manager.toString(), + uint64_t(400), reducer._token->getSize()); + CPPUNIT_ASSERT_EQUAL_MSG(manager.toString(), + uint64_t(600), put->getSize()); + } + CPPUNIT_ASSERT_EQUAL_MSG(state.toString(true), + uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL_MSG(state.toString(true), + uint64_t(0), current.getUserCount()); + + // Test merge and tracking of allocation counts with merge, by doing + // operations with tokens and see that user count and used size + // correctly go back to zero. + { + framework::MemoryToken::UP tok1( + manager.allocate(putAlloc, 5, 5, 40)); + framework::MemoryToken::UP tok2( + manager.allocate(putAlloc, 10, 10, 40)); + framework::MemoryToken::UP tok3( + manager.allocate(putAlloc, 20, 20, 40)); + framework::MemoryToken::UP tok4( + manager.allocate(putAlloc, 40, 40, 40)); + framework::MemoryToken::UP tok5( + manager.allocate(putAlloc, 80, 80, 40)); + framework::MemoryToken::UP tok6( + manager.allocate(putAlloc, 1, 1, 40)); + framework::MemoryToken::UP tok7( + manager.allocate(putAlloc, 3, 3, 40)); + } +} + +void +MemoryManagerTest::testCacheAllocation() +{ + uint64_t maxMemory = 3000; + + RealClock clock; + SimpleMemoryLogic::UP logic(new PriorityMemoryLogic(clock, maxMemory)); + logic->setCacheThreshold(1.0); + + AllocationLogic::UP allLogic(std::move(logic)); + MemoryManager manager(std::move(allLogic)); + + const MemoryAllocationType& putAlloc(manager.registerAllocationType( + MemoryAllocationType("put", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& cacheAlloc(manager.registerAllocationType( + MemoryAllocationType("cache", MemoryAllocationType::CACHE))); + + framework::MemoryToken::UP token = + manager.allocate(putAlloc, + 50, + 50, + 127); + + CPPUNIT_ASSERT_EQUAL(50, (int)token->getSize()); + + framework::MemoryToken::UP token2 = + manager.allocate(cacheAlloc, + 1000, + 2000, + 127); + + CPPUNIT_ASSERT_EQUAL(2000, (int)token2->getSize()); + + token2->resize(2000, 3000); + + CPPUNIT_ASSERT_EQUAL(2950, (int)token2->getSize()); +} + +namespace { +struct MemoryManagerLoadGiver : public document::Runnable, + public ReduceMemoryUsageInterface +{ + MemoryManager& _manager; + const framework::MemoryAllocationType& _type; + uint8_t _priority; + uint32_t _minMem; + uint32_t _maxMem; + uint32_t _failed; + uint32_t _ok; + uint32_t _reduced; + typedef vespalib::LinkedPtr<MemoryToken> MemoryTokenPtr; + std::vector<MemoryTokenPtr> _tokens; + vespalib::Lock _cacheLock; + + MemoryManagerLoadGiver( + MemoryManager& manager, + const framework::MemoryAllocationType& type, + uint8_t priority, + uint32_t minMem, + uint32_t maxMem, + uint32_t tokensToKeep) + : _manager(manager), + _type(type), + _priority(priority), + _minMem(minMem), + _maxMem(maxMem), + _failed(0), + _ok(0), + _reduced(0), + _tokens(tokensToKeep) + { + } + + uint64_t reduceMemoryConsumption(const MemoryToken&, uint64_t reduceBy) { + ++_reduced; + return reduceBy; + } + + void run() { + ReduceMemoryUsageInterface* reducer = 0; + if (_type.isCache()) reducer = this; + vespalib::RandomGen randomizer; + while (running()) { + vespalib::Lock lock(_cacheLock); + framework::MemoryToken::UP token = _manager.allocate( + _type, _minMem, _maxMem, _priority, reducer); + if (token.get() == 0) { + ++_failed; + } else { + ++_ok; + } + uint32_t index = randomizer.nextUint32(0, _tokens.size() - 1); + _tokens[index] = MemoryTokenPtr(token.release()); + } + } +}; +} + +void +MemoryManagerTest::testStress() +{ + uint64_t stressTimeMS = 1 * 1000; + uint64_t maxMemory = 1 * 1024 * 1024; + RealClock clock; + AllocationLogic::UP logic(new PriorityMemoryLogic(clock, maxMemory)); + MemoryManager manager(std::move(logic)); + + FastOS_ThreadPool pool(128 * 1024); + std::vector<MemoryManagerLoadGiver*> loadGivers; + for (uint32_t type = 0; type < 5; ++type) { + const MemoryAllocationType* allocType = 0; + uint32_t min = 1000, max = 5000; + if (type == 0) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "default")); + } else if (type == 1) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "external", MemoryAllocationType::EXTERNAL_LOAD)); + } else if (type == 2) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "forced", MemoryAllocationType::FORCE_ALLOCATE)); + } else if (type == 3) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "forcedExternal", MemoryAllocationType::FORCE_ALLOCATE + | MemoryAllocationType::EXTERNAL_LOAD)); + } else if (type == 4) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "cache", MemoryAllocationType::CACHE)); + max = 30000; + } + for (int priority = 0; priority < 256; priority += 8) { + loadGivers.push_back(new MemoryManagerLoadGiver( + manager, *allocType, priority, min, max, 10)); + loadGivers.back()->start(pool); + } + FastOS_Thread::Sleep(stressTimeMS); + } + FastOS_Thread::Sleep(5 * stressTimeMS); + uint64_t okTotal = 0, failedTotal = 0, reducedTotal = 0; + for (uint32_t i = 0; i < loadGivers.size(); i++) { + /* + fprintf(stderr, "%d %s-%u: Failed %d, ok %d, reduced %d\n", + i, loadGivers[i]->_type.getName().c_str(), + uint32_t(loadGivers[i]->_priority), + loadGivers[i]->_failed, loadGivers[i]->_ok, + loadGivers[i]->_reduced); // */ + okTotal += loadGivers[i]->_ok; + failedTotal += loadGivers[i]->_failed; + reducedTotal += loadGivers[i]->_reduced; + } + for (uint32_t i = 0; i < loadGivers.size(); i++) loadGivers[i]->stop(); + for (uint32_t i = 0; i < loadGivers.size(); i++) loadGivers[i]->join(); + pool.Close(); + + /* + bool verbose = false; + std::cerr << "\n\nMemory allocations at end of load:\n"; + manager.print(std::cerr, verbose, ""); // */ + + for (uint32_t i = 0; i < loadGivers.size(); i++) { + loadGivers[i]->_tokens.clear(); + } + for (uint32_t i = 0; i < loadGivers.size(); i++) { + delete loadGivers[i]; + } + loadGivers.clear(); + + //std::cerr << "\n\nMemory allocations at end of testl:\n"; + //manager.print(std::cerr, verbose, ""); + + std::cerr << "\n Managed " << std::fixed + << (okTotal / (stressTimeMS / 1000)) + << " ok, " << (failedTotal / (stressTimeMS / 1000)) + << " failed and " << (reducedTotal / (stressTimeMS / 1000)) + << " reduced allocations/s.\n "; + + MemoryState state(clock, 1); + manager.getState(state); + const MemoryState::SnapShot& current(state.getCurrentSnapshot()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSizeIgnoringCache()); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/tests/memory/memorystatetest.cpp b/storageframework/src/tests/memory/memorystatetest.cpp new file mode 100644 index 00000000000..9703d7d0133 --- /dev/null +++ b/storageframework/src/tests/memory/memorystatetest.cpp @@ -0,0 +1,176 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <vespa/storageframework/defaultimplementation/memory/memorystate.h> +#include <vespa/storageframework/generic/memory/memorymanagerinterface.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct MemoryStateTest : public CppUnit::TestFixture +{ + void setUp() {} + void tearDown() {} + + void testBasics(); + + CPPUNIT_TEST_SUITE(MemoryStateTest); + CPPUNIT_TEST(testBasics); // Fails sometimes, test needs rewrite. + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(MemoryStateTest); + +class SimpleMemoryManager : public framework::MemoryManagerInterface +{ +private: + std::map<std::string, framework::MemoryAllocationType> _types; + +public: + virtual void setMaximumMemoryUsage(uint64_t max) { (void) max; } + + virtual const framework::MemoryAllocationType& + registerAllocationType(const framework::MemoryAllocationType& type) { + _types[type.getName()] = type; + return _types[type.getName()]; + } + + virtual const framework::MemoryAllocationType& + getAllocationType(const std::string& name) const { + std::map<std::string, framework::MemoryAllocationType>::const_iterator iter = + _types.find(name); + + if (iter == _types.end()) { + throw vespalib::IllegalArgumentException("Allocation type not found: " + name); + } + + return iter->second; + } + + virtual std::vector<const MemoryAllocationType*> getAllocationTypes() const + { + std::vector<const MemoryAllocationType*> types; + for(std::map<std::string, framework::MemoryAllocationType> + ::const_iterator it = _types.begin(); it != _types.end(); ++it) + { + types.push_back(&it->second); + } + return types; + } + + framework::MemoryToken::UP allocate(const framework::MemoryAllocationType&, + uint64_t, + uint64_t, + uint8_t, + framework::ReduceMemoryUsageInterface*) + { + return framework::MemoryToken::UP(); + } + + uint64_t getMemorySizeFreeForPriority(uint8_t priority) const { + (void) priority; + return 0; + } +}; + +void +MemoryStateTest::testBasics() +{ + SimpleMemoryManager manager; + + const MemoryAllocationType& putAlloc(manager.registerAllocationType( + MemoryAllocationType("MESSAGE_PUT", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& getAlloc(manager.registerAllocationType( + MemoryAllocationType("MESSAGE_GET", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& blockAlloc(manager.registerAllocationType( + MemoryAllocationType("MESSAGE_DOCBLOCK"))); + const MemoryAllocationType& databaseAlloc(manager.registerAllocationType( + MemoryAllocationType("DATABASE"))); + const MemoryAllocationType& cacheAlloc(manager.registerAllocationType( + MemoryAllocationType("SLOTFILE_CACHE", MemoryAllocationType::CACHE))); + + uint32_t maxMemory = 1024; + + RealClock clock; + MemoryState state1(clock, maxMemory); + MemoryState state2(clock, maxMemory); + + state1.setMinJumpToUpdateMax(50); + + state1.addToEntry(putAlloc, 100, 10, + MemoryState::GOT_MAX, false); + state1.addToEntry(putAlloc, 100, 60, + MemoryState::GOT_MAX, false); + state1.addToEntry(blockAlloc, + 200, 20, + MemoryState::GOT_MIN, false); + state1.addToEntry(getAlloc, 0, 15, + MemoryState::DENIED, false, 0); + state1.addToEntry(databaseAlloc, 150, 0, + MemoryState::DENIED, true, 1); + state1.addToEntry(cacheAlloc, 45, 0, + MemoryState::GOT_MAX, true, 1); + + state2.addToEntry(putAlloc, 50, 10, + MemoryState::GOT_MIN, false); + state2.addToEntry(putAlloc, 20, 40, + MemoryState::GOT_MIN, false); + + state1.removeFromEntry(databaseAlloc, 25, 0, 0); + state1.removeFromEntry(putAlloc, 100, 60); + + MemoryState::SnapShot state3; + state3 = state1.getMaxSnapshot(); + state3 += state2.getMaxSnapshot(); + + std::string expected; + expected = + "\n" + "MemoryState(Max memory: 1024) {\n" + " Current: SnapShot(Used 470, w/o cache 425) {\n" + " Type(Pri): Used(Size/Allocs) Stats(Allocs, Wanted, Min, Denied, Forced)\n" + " DATABASE(0): Used(125 B / 1) Stats(1, 0, 0, 1, 1)\n" + " MESSAGE_DOCBLOCK(20): Used(200 B / 1) Stats(1, 0, 1, 0, 0)\n" + " MESSAGE_GET(15): Used(0 B / 0) Stats(1, 0, 0, 1, 0)\n" + " MESSAGE_PUT(10): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" + " MESSAGE_PUT(60): Used(0 B / 0) Stats(1, 1, 0, 0, 0)\n" + " SLOTFILE_CACHE(0): Used(45 B / 1) Stats(1, 1, 0, 0, 1)\n" + " }\n" + " Max: SnapShot(Used 550, w/o cache 550) {\n" + " Type(Pri): Used(Size/Allocs) Stats(Allocs, Wanted, Min, Denied, Forced)\n" + " DATABASE(0): Used(150 B / 1) Stats(1, 0, 0, 1, 1)\n" + " MESSAGE_DOCBLOCK(20): Used(200 B / 1) Stats(1, 0, 1, 0, 0)\n" + " MESSAGE_GET(15): Used(0 B / 0) Stats(1, 0, 0, 1, 0)\n" + " MESSAGE_PUT(10): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" + " MESSAGE_PUT(60): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" + " }\n" + "}"; + + CPPUNIT_ASSERT_EQUAL(expected, "\n" + state1.toString(true)); + expected = "\n" +"MemoryState(Max memory: 1024) {\n" +" Current: SnapShot(Used 70, w/o cache 70) {\n" +" Type(Pri): Used(Size/Allocs) Stats(Allocs, Wanted, Min, Denied, Forced)\n" +" MESSAGE_PUT(10): Used(50 B / 1) Stats(1, 0, 1, 0, 0)\n" +" MESSAGE_PUT(40): Used(20 B / 1) Stats(1, 0, 1, 0, 0)\n" +" }\n" +"}"; + CPPUNIT_ASSERT_EQUAL(expected, "\n" + state2.toString(true)); + expected = "\n" +"SnapShot(Used 550, w/o cache 550) {\n" +" Type(Pri): Used(Size/Allocs) Stats(Allocs, Wanted, Min, Denied, Forced)\n" +" DATABASE(0): Used(150 B / 1) Stats(1, 0, 0, 1, 1)\n" +" MESSAGE_DOCBLOCK(20): Used(200 B / 1) Stats(1, 0, 1, 0, 0)\n" +" MESSAGE_GET(15): Used(0 B / 0) Stats(1, 0, 0, 1, 0)\n" +" MESSAGE_PUT(10): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" +" MESSAGE_PUT(60): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" +"}"; + CPPUNIT_ASSERT_EQUAL(expected, "\n" + state3.toString(true)); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/tests/status/.gitignore b/storageframework/src/tests/status/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/tests/status/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/tests/status/CMakeLists.txt b/storageframework/src/tests/status/CMakeLists.txt new file mode 100644 index 00000000000..2c829d48810 --- /dev/null +++ b/storageframework/src/tests/status/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_teststatus + SOURCES + htmlstatustest.cpp + DEPENDS +) diff --git a/storageframework/src/tests/status/htmlstatustest.cpp b/storageframework/src/tests/status/htmlstatustest.cpp new file mode 100644 index 00000000000..2c3ef818e8e --- /dev/null +++ b/storageframework/src/tests/status/htmlstatustest.cpp @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/storageframework/generic/status/htmlstatusreporter.h> + +namespace storage { +namespace framework { + +struct HtmlStatusTest : public CppUnit::TestFixture { + + void testHtmlStatus(); + + CPPUNIT_TEST_SUITE(HtmlStatusTest); + CPPUNIT_TEST(testHtmlStatus); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(HtmlStatusTest); + +void +HtmlStatusTest::testHtmlStatus() +{ +} + +} // framework +} // storage diff --git a/storageframework/src/tests/testrunner.cpp b/storageframework/src/tests/testrunner.cpp new file mode 100644 index 00000000000..16027870c47 --- /dev/null +++ b/storageframework/src/tests/testrunner.cpp @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <iostream> +#include <vespa/log/log.h> +#include <vespa/vdstestlib/cppunit/cppunittestrunner.h> + +LOG_SETUP("persistencecppunittests"); + +int +main(int argc, char **argv) +{ + vdstestlib::CppUnitTestRunner testRunner; + return testRunner.run(argc, argv); +} diff --git a/storageframework/src/tests/thread/.gitignore b/storageframework/src/tests/thread/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/tests/thread/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/tests/thread/CMakeLists.txt b/storageframework/src/tests/thread/CMakeLists.txt new file mode 100644 index 00000000000..961b49d065a --- /dev/null +++ b/storageframework/src/tests/thread/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_testthread + SOURCES + tickingthreadtest.cpp + taskthreadtest.cpp + DEPENDS +) diff --git a/storageframework/src/tests/thread/taskthreadtest.cpp b/storageframework/src/tests/thread/taskthreadtest.cpp new file mode 100644 index 00000000000..6b524f10cf5 --- /dev/null +++ b/storageframework/src/tests/thread/taskthreadtest.cpp @@ -0,0 +1,69 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/thread/taskthread.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +namespace storage { +namespace framework { + +struct TaskThreadTest : public CppUnit::TestFixture +{ + + void testNormalUsage(); + + CPPUNIT_TEST_SUITE(TaskThreadTest); + CPPUNIT_TEST(testNormalUsage); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(TaskThreadTest); + +namespace { + struct Task { + std::string _name; + uint8_t _priority; + + Task(const std::string& name, uint8_t priority) + : _name(name), _priority(priority) {} + + bool operator<(const Task& other) const { + return (_priority > other._priority); + } + uint8_t getPriority() const { return _priority; } + }; + + struct MyThread : public TaskThread<Task> { + MyThread(ThreadLock& lock) : TaskThread<Task>(lock) {} + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) { + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + }; +} + +void +TaskThreadTest::testNormalUsage() +{ + TickingThreadPool::UP pool(TickingThreadPool::createDefault("testApp")); + + MyThread t(*pool); + t.addTask(Task("a", 6)); + t.addTask(Task("b", 3)); + t.addTask(Task("c", 8)); + t.addTask(Task("d", 4)); + CPPUNIT_ASSERT(t.empty()); // Still empty before critical tick has run + dynamic_cast<TickingThread&>(t).doCriticalTick(0); + CPPUNIT_ASSERT(!t.empty()); + CPPUNIT_ASSERT_EQUAL(3, (int) t.peek().getPriority()); + std::ostringstream ost; + while (!t.empty()) { + Task task(t.peek()); + ost << task._name << '(' << ((int) task.getPriority()) << ") "; + t.pop(); + } + CPPUNIT_ASSERT_EQUAL(std::string("b(3) d(4) a(6) c(8) "), ost.str()); +} + + +} // framework +} // storage diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp new file mode 100644 index 00000000000..ca15263d446 --- /dev/null +++ b/storageframework/src/tests/thread/tickingthreadtest.cpp @@ -0,0 +1,370 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <vespa/storageframework/defaultimplementation/component/testcomponentregister.h> +#include <vespa/storageframework/generic/thread/tickingthread.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +LOG_SETUP(".tickingthreadtest"); + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct TickingThreadTest : public CppUnit::TestFixture +{ + void setUp() {} + void tearDown() {} + + void testTicksBeforeWaitBasic(); + void testTicksBeforeWaitLiveUpdate(); + void testDestroyWithoutStarting(); + void testVerboseStopping(); + void testStopOnDeletion(); + void testLockAllTicks(); + void testLockCriticalTicks(); + void testFailsOnStartWithoutThreads(); + void testBroadcast(); + + CPPUNIT_TEST_SUITE(TickingThreadTest); + CPPUNIT_TEST(testTicksBeforeWaitBasic); + CPPUNIT_TEST(testTicksBeforeWaitLiveUpdate); + CPPUNIT_TEST(testDestroyWithoutStarting); + CPPUNIT_TEST(testVerboseStopping); + CPPUNIT_TEST(testStopOnDeletion); + CPPUNIT_TEST(testLockAllTicks); + CPPUNIT_TEST(testLockCriticalTicks); + CPPUNIT_TEST(testFailsOnStartWithoutThreads); + CPPUNIT_TEST(testBroadcast); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(TickingThreadTest); + +namespace { + +struct Context { + uint64_t _critTickCount; + uint64_t _nonCritTickCount; + + Context() : _critTickCount(0), _nonCritTickCount(0) {} +}; + +struct MyApp : public TickingThread { + uint32_t _critOverlapCounter; + bool _doCritOverlapTest; + bool _critOverlap; + std::vector<Context> _context; + TickingThreadPool::UP _threadPool; + + MyApp(int threadCount, bool doCritOverlapTest = false) + : _critOverlapCounter(0), + _doCritOverlapTest(doCritOverlapTest), + _critOverlap(false), + _threadPool(TickingThreadPool::createDefault("testApp")) + { + for (int i=0; i<threadCount; ++i) { + _threadPool->addThread(*this); + _context.push_back(Context()); + } + } + + void start(ThreadPool& p) { _threadPool->start(p); } + + virtual ThreadWaitInfo doCriticalTick(ThreadIndex index) { + assert(index < _context.size()); + Context& c(_context[index]); + if (_doCritOverlapTest) { + uint32_t oldTick = _critOverlapCounter; + FastOS_Thread::Sleep(1); + _critOverlap |= (_critOverlapCounter != oldTick); + ++_critOverlapCounter; + } + ++c._critTickCount; + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex index) { + assert(index < _context.size()); + Context& c(_context[index]); + ++c._nonCritTickCount; + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + + uint64_t getMinCritTick() { + uint64_t min = std::numeric_limits<uint64_t>().max(); + for (uint32_t i=0; i<_context.size(); ++i) { + min = std::min(min, _context[i]._critTickCount); + } + return min; + } + uint64_t getMinNonCritTick() { + uint64_t min = std::numeric_limits<uint64_t>().max(); + for (uint32_t i=0; i<_context.size(); ++i) { + min = std::min(min, _context[i]._critTickCount); + } + return min; + } + uint64_t getTotalCritTicks() { + uint64_t total = 0; + for (uint32_t i=0; i<_context.size(); ++i) { + total += _context[i]._critTickCount; + } + return total; + } + uint64_t getTotalNonCritTicks() { + uint64_t total = 0; + for (uint32_t i=0; i<_context.size(); ++i) { + total += _context[i]._nonCritTickCount; + } + return total; + } + uint64_t getTotalTicks() + { return getTotalCritTicks() + getTotalNonCritTicks(); } + bool hasCritOverlap() { return _critOverlap; } +}; + +} + +void +TickingThreadTest::testTicksBeforeWaitBasic() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 1; + MyApp app(threadCount); + app.start(testReg.getThreadPoolImpl()); + + // Default behaviour is 5ms sleep before each tick. Let's do 20 ticks, + // and verify time is in right ballpark. + int totalSleepMs = 0; + while (app.getTotalNonCritTicks() < 20) { + FastOS_Thread::Sleep(1); + totalSleepMs++; + } + CPPUNIT_ASSERT(totalSleepMs > 10); + app._threadPool->stop(); +} + +void +TickingThreadTest::testTicksBeforeWaitLiveUpdate() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 1; + MyApp app(threadCount); + // Configure thread pool to send bulks of 5000 ticks each second. + long unsigned int ticksBeforeWaitMs = 5000; + MilliSecTime waitTimeMs(1000); + MilliSecTime maxProcessingTime(234234); + app.start(testReg.getThreadPoolImpl()); + app._threadPool->updateParametersAllThreads( + waitTimeMs, maxProcessingTime, ticksBeforeWaitMs); + + // Check that 5000 ticks are received instantly (usually <2 ms) + // (if live update is broken it will take more than an hour). + int maxAttempts = 120000; // a bit more than 120 secs + while (app.getTotalNonCritTicks() < ticksBeforeWaitMs && maxAttempts-->0) { + FastOS_Thread::Sleep(1); + } + + CPPUNIT_ASSERT(maxAttempts>0); + CPPUNIT_ASSERT(app.getTotalNonCritTicks() >= ticksBeforeWaitMs); + app._threadPool->stop(); +} + +void +TickingThreadTest::testDestroyWithoutStarting() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + MyApp app(threadCount, true); +} + +void +TickingThreadTest::testVerboseStopping() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (app.getMinCritTick() < 5) { + FastOS_Thread::Sleep(1); + } + app._threadPool->stop(); +} + +void +TickingThreadTest::testStopOnDeletion() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (app.getMinCritTick() < 5) { + FastOS_Thread::Sleep(1); + } +} + +void +TickingThreadTest::testLockAllTicks() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + MyApp app1(threadCount); + MyApp app2(threadCount); + app1.start(testReg.getThreadPoolImpl()); + app2.start(testReg.getThreadPoolImpl()); + while (std::min(app1.getMinCritTick(), app2.getMinCritTick()) < 5) { + FastOS_Thread::Sleep(1); + } + uint64_t ticks1, ticks2; + { + TickingLockGuard guard(app1._threadPool->freezeAllTicks()); + ticks1 = app1.getTotalTicks(); + ticks2 = app2.getTotalTicks(); + + while (app2.getMinCritTick() < 2 * ticks2 / threadCount) { + FastOS_Thread::Sleep(1); + } + CPPUNIT_ASSERT_EQUAL(ticks1, app1.getTotalTicks()); + } + while (app1.getMinCritTick() < 2 * ticks1 / threadCount) { + FastOS_Thread::Sleep(1); + } +} + +void +TickingThreadTest::testLockCriticalTicks() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + uint64_t iterationsBeforeOverlap = 0; + { + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (!app.hasCritOverlap()) { + FastOS_Thread::Sleep(1); + ++app._critOverlapCounter; + ++iterationsBeforeOverlap; + } + } + { + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + for (uint64_t i=0; i<iterationsBeforeOverlap * 10; ++i) { + FastOS_Thread::Sleep(1); + TickingLockGuard guard(app._threadPool->freezeCriticalTicks()); + for (int j=0; j<threadCount; ++j) { + ++app._context[j]._critTickCount; + } + CPPUNIT_ASSERT(!app.hasCritOverlap()); + } + } +} + +void +TickingThreadTest::testFailsOnStartWithoutThreads() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 0; + MyApp app(threadCount, true); + try{ + app.start(testReg.getThreadPoolImpl()); + CPPUNIT_FAIL("Expected starting without threads to fail"); + } catch (vespalib::Exception& e) { + CPPUNIT_ASSERT_EQUAL(vespalib::string( + "Makes no sense to start threadpool without threads"), + e.getMessage()); + } +} + +namespace { + +RealClock clock; + +void printTaskInfo(const std::string& task, const char* action) { + vespalib::string msg = vespalib::make_string( + "%" PRIu64 ": %s %s\n", + clock.getTimeInMicros().getTime(), + task.c_str(), + action); + // std::cerr << msg; +} + +struct BroadcastApp : public TickingThread { + std::vector<std::string> _queue; + std::vector<std::string> _active; + std::vector<std::string> _processed; + TickingThreadPool::UP _threadPool; + + // Set a huge wait time by default to ensure we have to notify + BroadcastApp() + : _threadPool(TickingThreadPool::createDefault( + "testApp", MilliSecTime(300000))) + { + _threadPool->addThread(*this); + } + + void start(ThreadPool& p) { _threadPool->start(p); } + + virtual ThreadWaitInfo doCriticalTick(ThreadIndex) { + if (!_queue.empty()) { + for (uint32_t i=0; i<_queue.size(); ++i) { + printTaskInfo(_queue[i], "activating"); + _active.push_back(_queue[i]); + } + _queue.clear(); + return ThreadWaitInfo::MORE_WORK_ENQUEUED; + } + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) { + if (!_active.empty()) { + for (uint32_t i=0; i<_active.size(); ++i) { + printTaskInfo(_queue[i], "processing"); + _processed.push_back(_active[i]); + } + _active.clear(); + } + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + + void doTask(const std::string& task) { + printTaskInfo(task, "enqueue"); + TickingLockGuard guard(_threadPool->freezeCriticalTicks()); + _queue.push_back(task); + guard.broadcast(); + } +}; + +} + + +void +TickingThreadTest::testBroadcast() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + BroadcastApp app; + app.start(testReg.getThreadPoolImpl()); + app.doTask("foo"); + FastOS_Thread::Sleep(1); + app.doTask("bar"); + FastOS_Thread::Sleep(1); + app.doTask("baz"); + FastOS_Thread::Sleep(1); + app.doTask("hmm"); + FastOS_Thread::Sleep(1); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/.gitignore b/storageframework/src/vespa/storageframework/.gitignore new file mode 100644 index 00000000000..fb3ac588d47 --- /dev/null +++ b/storageframework/src/vespa/storageframework/.gitignore @@ -0,0 +1,4 @@ +/.depend +/Makefile +/features.h +/libstorageframework.so.5.1 diff --git a/storageframework/src/vespa/storageframework/CMakeLists.txt b/storageframework/src/vespa/storageframework/CMakeLists.txt new file mode 100644 index 00000000000..5b57cc9c2a1 --- /dev/null +++ b/storageframework/src/vespa/storageframework/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework INTERFACE + SOURCES + INSTALL lib64 + DEPENDS + storageframework_defaultimplementation + storageframework_generic +) diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/.gitignore b/storageframework/src/vespa/storageframework/defaultimplementation/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/CMakeLists.txt b/storageframework/src/vespa/storageframework/defaultimplementation/CMakeLists.txt new file mode 100644 index 00000000000..51e992b134d --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/CMakeLists.txt @@ -0,0 +1,10 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_defaultimplementation + SOURCES + $<TARGET_OBJECTS:storageframework_clockimpl> + $<TARGET_OBJECTS:storageframework_componentimpl> + $<TARGET_OBJECTS:storageframework_memoryimpl> + $<TARGET_OBJECTS:storageframework_threadimpl> + INSTALL lib64 + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/clock/.gitignore b/storageframework/src/vespa/storageframework/defaultimplementation/clock/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/clock/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/clock/CMakeLists.txt b/storageframework/src/vespa/storageframework/defaultimplementation/clock/CMakeLists.txt new file mode 100644 index 00000000000..239b8af86d2 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/clock/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_clockimpl OBJECT + SOURCES + realclock.cpp + fakeclock.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/clock/fakeclock.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/clock/fakeclock.cpp new file mode 100644 index 00000000000..cc9b01add51 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/clock/fakeclock.cpp @@ -0,0 +1,18 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +FakeClock::FakeClock(Mode m, framework::MicroSecTime startTime) + : _mode(m), + _absoluteTime(startTime), + _cycleCount(0) +{ +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h b/storageframework/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h new file mode 100644 index 00000000000..f15414b3f51 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h @@ -0,0 +1,82 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::FakeClock + * \ingroup test + * + * \brief Implements a fake clock to use for testing. + */ +#pragma once + +#include <vespa/storageframework/generic/clock/clock.h> +#include <vespa/storageframework/storageframework.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct FakeClock : public framework::Clock { + enum Mode { + FAKE_ABSOLUTE, // Time is always equal to supplied absolute time + FAKE_ABSOLUTE_CYCLE // Time is equal to absolute time + counter that + // increase for each request so you never get same + // timestamp twice. + }; + +private: + Mode _mode; + framework::MicroSecTime _absoluteTime; + mutable time_t _cycleCount; + vespalib::Lock _lock; + +public: + FakeClock(Mode m = FAKE_ABSOLUTE, + framework::MicroSecTime startTime = framework::MicroSecTime(1)); + + void setMode(Mode m) { + vespalib::LockGuard guard(_lock); + _mode = m; + } + virtual void setFakeCycleMode() { setMode(FAKE_ABSOLUTE_CYCLE); } + + virtual void setAbsoluteTimeInSeconds(uint32_t seconds) { + vespalib::LockGuard guard(_lock); + _absoluteTime = framework::MicroSecTime(seconds * uint64_t(1000000)); + _cycleCount = 0; + _mode = FAKE_ABSOLUTE; + } + + virtual void setAbsoluteTimeInMicroSeconds(uint64_t usecs) { + vespalib::LockGuard guard(_lock); + _absoluteTime = framework::MicroSecTime(usecs); + _cycleCount = 0; + _mode = FAKE_ABSOLUTE; + } + + virtual void addMilliSecondsToTime(uint64_t ms) { + vespalib::LockGuard guard(_lock); + _absoluteTime += framework::MicroSecTime(ms * 1000); + } + + virtual void addSecondsToTime(uint32_t nr) { + vespalib::LockGuard guard(_lock); + _absoluteTime += framework::MicroSecTime(nr * uint64_t(1000000)); + } + + virtual framework::MicroSecTime getTimeInMicros() const { + vespalib::LockGuard guard(_lock); + if (_mode == FAKE_ABSOLUTE) return _absoluteTime; + return _absoluteTime + framework::MicroSecTime(1000000 * _cycleCount++); + } + virtual framework::MilliSecTime getTimeInMillis() const { + return getTimeInMicros().getMillis(); + } + virtual framework::SecondTime getTimeInSeconds() const { + return getTimeInMicros().getSeconds(); + } +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp new file mode 100644 index 00000000000..e9348512427 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp @@ -0,0 +1,39 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> + +#include <sys/time.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +MicroSecTime +RealClock::getTimeInMicros() const +{ + struct timeval mytime; + gettimeofday(&mytime, 0); + return MicroSecTime(mytime.tv_sec * 1000000llu + mytime.tv_usec); +} + +MilliSecTime +RealClock::getTimeInMillis() const +{ + struct timeval mytime; + gettimeofday(&mytime, 0); + return MilliSecTime( + mytime.tv_sec * 1000llu + mytime.tv_usec / 1000); +} + +SecondTime +RealClock::getTimeInSeconds() const +{ + struct timeval mytime; + gettimeofday(&mytime, 0); + return SecondTime(mytime.tv_sec); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/clock/realclock.h b/storageframework/src/vespa/storageframework/defaultimplementation/clock/realclock.h new file mode 100644 index 00000000000..633ab126c45 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/clock/realclock.h @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::RealClock + * \ingroup frameworkimpl + * + * \brief Implements a class for calculating current time. + * + * Real implementation for gathering all clock information used in application. + */ +#pragma once + +#include <vespa/storageframework/storageframework.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct RealClock : public Clock { + virtual MicroSecTime getTimeInMicros() const; + virtual MilliSecTime getTimeInMillis() const; + virtual SecondTime getTimeInSeconds() const; +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/.gitignore b/storageframework/src/vespa/storageframework/defaultimplementation/component/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/CMakeLists.txt b/storageframework/src/vespa/storageframework/defaultimplementation/component/CMakeLists.txt new file mode 100644 index 00000000000..68e3fec2fca --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_componentimpl OBJECT + SOURCES + componentregisterimpl.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp new file mode 100644 index 00000000000..c523a24c69e --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp @@ -0,0 +1,188 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> +#include <vespa/storageframework/storageframework.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +ComponentRegisterImpl::ComponentRegisterImpl() + : _componentLock(), + _components(), + _topMetricSet("vds", "", ""), + _hooks(), + _metricManager(0), + _memoryManager(0), + _clock(0), + _threadPool(0), + _upgradeFlag(NO_UPGRADE_SPECIAL_HANDLING_ACTIVE), + _shutdownListener(0) +{ +} + +void +ComponentRegisterImpl::registerComponent(ManagedComponent& mc) +{ + vespalib::LockGuard lock(_componentLock); + _components.push_back(&mc); + if (_memoryManager != 0) mc.setMemoryManager(*_memoryManager); + if (_clock != 0) mc.setClock(*_clock); + if (_threadPool != 0) mc.setThreadPool(*_threadPool); + if (_metricManager != 0) mc.setMetricRegistrator(*this); + mc.setUpgradeFlag(_upgradeFlag); +} + +void +ComponentRegisterImpl::requestShutdown(vespalib::stringref reason) +{ + vespalib::LockGuard lock(_componentLock); + if (_shutdownListener != 0) { + _shutdownListener->requestShutdown(reason); + } +} + +void +ComponentRegisterImpl::setMetricManager(metrics::MetricManager& mm) +{ + std::vector<ManagedComponent*> components; + { + vespalib::LockGuard lock(_componentLock); + assert(_metricManager == 0); + components = _components; + _metricManager = &mm; + } + { + metrics::MetricLockGuard lock(mm.getMetricLock()); + mm.registerMetric(lock, _topMetricSet); + } + for (uint32_t i=0; i<components.size(); ++i) { + components[i]->setMetricRegistrator(*this); + } +} + +void +ComponentRegisterImpl::setMemoryManager(MemoryManagerInterface& mm) +{ + vespalib::LockGuard lock(_componentLock); + _memoryManager = &mm; + for (uint32_t i=0; i<_components.size(); ++i) { + _components[i]->setMemoryManager(mm); + } +} + +void +ComponentRegisterImpl::setClock(Clock& c) +{ + vespalib::LockGuard lock(_componentLock); + assert(_clock == 0); + _clock = &c; + for (uint32_t i=0; i<_components.size(); ++i) { + _components[i]->setClock(c); + } +} + +void +ComponentRegisterImpl::setThreadPool(ThreadPool& tp) +{ + vespalib::LockGuard lock(_componentLock); + assert(_threadPool == 0); + _threadPool = &tp; + for (uint32_t i=0; i<_components.size(); ++i) { + _components[i]->setThreadPool(tp); + } +} + +void +ComponentRegisterImpl::setUpgradeFlag(UpgradeFlags flag) +{ + vespalib::LockGuard lock(_componentLock); + _upgradeFlag = flag; + for (uint32_t i=0; i<_components.size(); ++i) { + _components[i]->setUpgradeFlag(_upgradeFlag); + } +} + +const StatusReporter* +ComponentRegisterImpl::getStatusReporter(vespalib::stringref id) +{ + vespalib::LockGuard lock(_componentLock); + for (uint32_t i=0; i<_components.size(); ++i) { + if (_components[i]->getStatusReporter() != 0 + && _components[i]->getStatusReporter()->getId() == id) + { + return _components[i]->getStatusReporter(); + } + } + return 0; +} + +std::vector<const StatusReporter*> +ComponentRegisterImpl::getStatusReporters() +{ + std::vector<const StatusReporter*> reporters; + vespalib::LockGuard lock(_componentLock); + for (uint32_t i=0; i<_components.size(); ++i) { + if (_components[i]->getStatusReporter() != 0) { + reporters.push_back(_components[i]->getStatusReporter()); + } + } + return reporters; +} + +void +ComponentRegisterImpl::registerMetric(metrics::Metric& m) +{ + metrics::MetricLockGuard lock(_metricManager->getMetricLock()); + _topMetricSet.registerMetric(m); +} + +namespace { + struct MetricHookWrapper : public metrics::MetricManager::UpdateHook { + MetricUpdateHook& _hook; + + MetricHookWrapper(vespalib::stringref name, + MetricUpdateHook& hook) + : metrics::MetricManager::UpdateHook(name.c_str()), + _hook(hook) + { + } + + void updateMetrics(const MetricLockGuard & guard) override { _hook.updateMetrics(guard); } + }; +} + +void +ComponentRegisterImpl::registerUpdateHook(vespalib::stringref name, + MetricUpdateHook& hook, + SecondTime period) +{ + vespalib::LockGuard lock(_componentLock); + metrics::MetricManager::UpdateHook::LP hookPtr( + new MetricHookWrapper(name, hook)); + _hooks.push_back(hookPtr); + _metricManager->addMetricUpdateHook(*hookPtr, period.getTime()); +} + +metrics::MetricLockGuard +ComponentRegisterImpl::getMetricManagerLock() +{ + return _metricManager->getMetricLock(); +} + +void +ComponentRegisterImpl::registerShutdownListener(ShutdownListener& listener) +{ + vespalib::LockGuard lock(_componentLock); + if (_shutdownListener != 0) { + throw vespalib::IllegalStateException( + "A shutdown listener is already registered. Add functionality " + "for having multiple if we need multiple.", VESPA_STRLOC); + } + _shutdownListener = &listener; +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h new file mode 100644 index 00000000000..633123efdf3 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h @@ -0,0 +1,88 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::ComponentRegisterImpl + * \ingroup component + * + * \brief Application server uses this class to manage components. + * + * This class implements set functions for the various implementations needed. + * It will set these implementations in all components already registered, and + * in components registered after that. Simplifies login in application server + * as it can just instantiate components in some order and set implementations + * as soon as they exist. + * + * It is possibly to subclass this implementation. That is useful if you also + * subclass component class to provide extra functionality. Then you can handle + * that extra functionality in the subclass. + */ +#pragma once + +#include <vespa/metrics/metricmanager.h> +#include <vespa/storageframework/generic/component/componentregister.h> +#include <vespa/storageframework/generic/component/managedcomponent.h> +#include <vespa/storageframework/generic/metric/metricregistrator.h> +#include <vespa/storageframework/generic/status/statusreportermap.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct ShutdownListener { + virtual ~ShutdownListener() {} + virtual void requestShutdown(vespalib::stringref reason) = 0; +}; + +class ComponentRegisterImpl : public virtual ComponentRegister, + public StatusReporterMap, + public MetricRegistrator +{ + vespalib::Lock _componentLock; + std::vector<ManagedComponent*> _components; + + metrics::MetricSet _topMetricSet; + std::vector<metrics::MetricManager::UpdateHook::LP> _hooks; + metrics::MetricManager* _metricManager; + MemoryManagerInterface* _memoryManager; + Clock* _clock; + ThreadPool* _threadPool; + UpgradeFlags _upgradeFlag; + ShutdownListener* _shutdownListener; + +public: + typedef std::unique_ptr<ComponentRegisterImpl> UP; + + ComponentRegisterImpl(); + + bool hasMetricManager() const { return (_metricManager != 0); } + metrics::MetricManager& getMetricManager() { + assert(_metricManager != 0); + return *_metricManager; + } + + virtual void registerComponent(ManagedComponent&); + virtual void requestShutdown(vespalib::stringref reason); + + void setMetricManager(metrics::MetricManager&); + void setMemoryManager(MemoryManagerInterface&); + void setClock(Clock&); + void setThreadPool(ThreadPool&); + void setUpgradeFlag(UpgradeFlags flag); + + const StatusReporter* getStatusReporter(vespalib::stringref id); + std::vector<const StatusReporter*> getStatusReporters(); + + void registerMetric(metrics::Metric&); + void registerUpdateHook(vespalib::stringref name, + MetricUpdateHook& hook, + SecondTime period); + metrics::MetricLockGuard getMetricManagerLock() override; + void registerShutdownListener(ShutdownListener&); + +}; + +} // defaultimplementation +} // framework +} // storage + + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h b/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h new file mode 100644 index 00000000000..4b6b4e47269 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::TestComponentRegister + * \ingroup component + * + * \brief Simple instance to use for testing. + * + * For testing we just want to set up a simple component register with the basic + * services that tests need, and that all tests need the same instance of. + * + * This instance should be the same for all using it. So don't add set functions + * that can possibly alter it while running. + */ +#pragma once + +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> +#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> +#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> +#include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +class TestComponentRegister { + ComponentRegisterImpl::UP _compReg; + FakeClock _clock; + ThreadPoolImpl _threadPool; + NoMemoryManager _memoryManager; + +public: + TestComponentRegister(ComponentRegisterImpl::UP compReg) + : _compReg(std::move(compReg)), + _clock(), + _threadPool(_clock), + _memoryManager() + { + assert(_compReg.get() != 0); + // Set a memory manager, so users can register memory types and + // ask for memory. + _compReg->setMemoryManager(_memoryManager); + // Set a fake clock, giving test control of clock + _compReg->setClock(_clock); + // Set a thread pool so components can make threads in tests. + _compReg->setThreadPool(_threadPool); + // Metric manager should not be needed. Tests of metric system can + // be done without using this class. Components can still register + // metrics without a manager. + + // Status page server should not be needed. Tests of status parts + // can be done without using this class. Components can still + // register status pages without a server + } + + ComponentRegisterImpl& getComponentRegister() { return *_compReg; } + FakeClock& getClock() { return _clock; } + ThreadPoolImpl& getThreadPoolImpl() { return _threadPool; } + FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); } + NoMemoryManager& getMemoryManager() { return _memoryManager; } +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/.gitignore b/storageframework/src/vespa/storageframework/defaultimplementation/memory/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/CMakeLists.txt b/storageframework/src/vespa/storageframework/defaultimplementation/memory/CMakeLists.txt new file mode 100644 index 00000000000..76fa9a9a08b --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/CMakeLists.txt @@ -0,0 +1,10 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_memoryimpl OBJECT + SOURCES + memorymanager.cpp + simplememorylogic.cpp + memorystate.cpp + prioritymemorylogic.cpp + nomemorymanager.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/emptymemorylogic.h b/storageframework/src/vespa/storageframework/defaultimplementation/memory/emptymemorylogic.h new file mode 100644 index 00000000000..ee91791bba3 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/emptymemorylogic.h @@ -0,0 +1,48 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <storage/memorymanager/memorymanager.h> +#include <storage/memorymanager/memorystate.h> + +namespace storage { + +class EmptyMemoryLogic : public AllocationLogic +{ +private: + MemoryState _state; + +public: + EmptyMemoryLogic() : _state(100) {} + + virtual void getState(MemoryState& state, bool resetMax) { + state = _state; + if (resetMax) _state.resetMax(); + } + + virtual MemoryToken::UP allocate( + const AllocationType& type, + uint64_t, uint64_t max, + storage::api::StorageMessage::Priority p, + ReduceMemoryUsageInterface* = 0) { + return MemoryToken::UP( + new MemoryToken(*this, type, max, p)); + } + + virtual bool resize(MemoryToken& token, uint64_t min, uint64_t max) { + setTokenSize(token, max); + return true; + } + + virtual void freeToken(MemoryToken&) {} + + virtual void print(std::ostream& out, bool verbose, + const std::string& indent) const + { + (void) verbose; (void) indent; + out << "EmptyMemoryLogic()"; + } +}; + +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorymanager.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorymanager.cpp new file mode 100644 index 00000000000..e04260a74fe --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorymanager.cpp @@ -0,0 +1,158 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/memory/memorymanager.h> +#include <vespa/storageframework/defaultimplementation/memory/memorystate.h> +#include <vespa/vespalib/util/exceptions.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +MemoryTokenImpl::MemoryTokenImpl(AllocationLogic& logic, + const MemoryAllocationType& type, + uint64_t allocated, + uint8_t p, + ReduceMemoryUsageInterface* reducer) + : _logic(logic), + _reducer(reducer), + _currentlyAllocated(allocated), + _allocCount(1), + _type(type), + _priority(p) +{ +} + +MemoryTokenImpl::~MemoryTokenImpl() +{ + _logic.freeToken(*this); +} + +bool +MemoryTokenImpl::resize(uint64_t min, uint64_t max) +{ + return _logic.resize(*this, min, max); +} + +void +MemoryTokenImpl::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + (void) verbose; (void) indent; + out << "MemoryToken(" << _type.getName() << ": Allocated(" << _allocCount << " - " + << _currentlyAllocated << ")"; +} + +AllocationLogic::~AllocationLogic() +{ +} + +MemoryToken::UP +AllocationLogic::allocate(const MemoryAllocationType& type, + uint64_t min, + uint64_t max, + uint8_t priority, + ReduceMemoryUsageInterface* reducer) +{ + MemoryToken::UP token(allocate(type, priority, reducer)); + assert(token.get()); + if (!resize(*token, min, max, 1)) token.reset(); + return token; +} + +bool +AllocationLogic::resize(MemoryTokenImpl& token, uint64_t min, uint64_t max) +{ + return resize(token, min, max, 0); +} + +MemoryManager::MemoryManager(AllocationLogic::UP logic) + : _logic(std::move(logic)) +{ + if (_logic.get() == 0) { + throw vespalib::IllegalArgumentException( + "Needs a real logic class to run. (Got null pointer)", + VESPA_STRLOC); + } +} + +MemoryManager::~MemoryManager() +{ +} + +void +MemoryManager::setMaximumMemoryUsage(uint64_t max) +{ + _logic->setMaximumMemoryUsage(max); +} + +void +MemoryManager::getState(MemoryState& state, bool resetMax) +{ + return _logic->getState(state, resetMax); +} + +const MemoryAllocationType& +MemoryManager::registerAllocationType(const MemoryAllocationType& type) +{ + vespalib::LockGuard lock(_typeLock); + _types[type.getName()] = MemoryAllocationType::LP( + new MemoryAllocationType(type)); + return *_types[type.getName()]; +} + +const MemoryAllocationType& +MemoryManager::getAllocationType(const std::string& name) const +{ + vespalib::LockGuard lock(_typeLock); + std::map<std::string, MemoryAllocationType::LP>::const_iterator it( + _types.find(name)); + if (it == _types.end()) { + throw vespalib::IllegalArgumentException( + "Allocation type not found: " + name, VESPA_STRLOC); + } + return *it->second; +} + +std::vector<const MemoryAllocationType*> +MemoryManager::getAllocationTypes() const +{ + vespalib::LockGuard lock(_typeLock); + std::vector<const MemoryAllocationType*> types; + for(std::map<std::string, MemoryAllocationType::LP>::const_iterator it + = _types.begin(); it != _types.end(); ++it) + { + types.push_back(it->second.get()); + } + return types; +} + +MemoryToken::UP +MemoryManager::allocate(const MemoryAllocationType& type, + uint64_t min, + uint64_t max, + uint8_t p, + ReduceMemoryUsageInterface* i) +{ + return _logic->allocate(type, min, max, p, i); +} + +uint64_t +MemoryManager::getMemorySizeFreeForPriority(uint8_t priority) const +{ + return _logic->getMemorySizeFreeForPriority(priority); +} + +void +MemoryManager::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + (void) verbose; (void) indent; + out << "Memory Manager {" << "\n" << indent << " "; + _logic->print(out, verbose, indent + " "); + out << "\n" << indent << "}"; +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorymanager.h b/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorymanager.h new file mode 100644 index 00000000000..488f32ec5ca --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorymanager.h @@ -0,0 +1,169 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::MemoryManager + * + * \brief Utility for tracking memory usage on distributor and storage nodes. + * + * The memory manager is responsible for limiting the memory allocated by + * various users within VDS storage and distributor nodes, such that these + * nodes don't use more memory than they can use, to avoid swapping. + * + * It will produce a status page to view big memory users, and give some + * historic data. It will track memory users and give them less and less memory + * as closer we are to utilizing all memory we are able to use. When getting + * close to full it will deny memory allocations to incoming commands that wants + * to use additional memory in able to complete the operation. + * + * The main class here defines the interface the client has to worry about. It + * should thus not point to the implementation in any way. + * + */ + +#pragma once + +#include <boost/utility.hpp> +#include <map> +#include <vespa/storageframework/generic/memory/memorymanagerinterface.h> +#include <vespa/vespalib/util/printable.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +class MemoryManager; +class AllocationLogic; +class MemoryState; + +class MemoryTokenImpl : public vespalib::Printable, + public MemoryToken, + public boost::noncopyable +{ + friend class AllocationLogic; + + AllocationLogic& _logic; + ReduceMemoryUsageInterface* _reducer; + uint64_t _currentlyAllocated; + uint32_t _allocCount; + const MemoryAllocationType& _type; + uint8_t _priority; + +public: + typedef std::unique_ptr<MemoryTokenImpl> UP; + + MemoryTokenImpl(AllocationLogic& logic, + const MemoryAllocationType& type, + uint64_t allocated, + uint8_t priority, + ReduceMemoryUsageInterface* = 0); + + virtual ~MemoryTokenImpl(); + + uint64_t getSize() const { return _currentlyAllocated; } + uint64_t getAllocationCount() const { return _allocCount; } + const MemoryAllocationType& getType() const { return _type; } + ReduceMemoryUsageInterface* getReducer() const { return _reducer; } + + uint8_t getPriority() const { return _priority; } + + virtual bool resize(uint64_t min, uint64_t max); + + virtual void print(std::ostream& out, bool verbose, + const std::string& indent) const; +}; + +class AllocationLogic : public vespalib::Printable +{ +protected: + /** + * MemoryTokens are friends with this class, such that logic classes + * can use this function to alter token size, without function for that + * being public. + */ + void setTokenSize(MemoryTokenImpl& token, uint64_t size) + { token._currentlyAllocated = size; } + + virtual MemoryToken::UP allocate(const MemoryAllocationType&, + uint8_t priority, + ReduceMemoryUsageInterface*) = 0; + virtual bool resize(MemoryToken& token, uint64_t min, uint64_t max, + uint32_t allocationCounts) = 0; +public: + typedef std::unique_ptr<AllocationLogic> UP; + virtual ~AllocationLogic() = 0; + + virtual void setMaximumMemoryUsage(uint64_t max) = 0; + + virtual void getState(MemoryState&, bool resetMax = false) = 0; + + /** + * Decide how much to allocate for this request. Should be between min + * and max, unless it's of a type that can be denied (such as external + * requests), in which case we can also deny allocation by returning a null + * token. + */ + MemoryToken::UP allocate(const MemoryAllocationType&, + uint64_t min, + uint64_t max, + uint8_t priority, + ReduceMemoryUsageInterface* = 0); + /** + * Resize the size in a token. If more memory is requested, then it might + * fail. The sizes given in min and max is given as total min and max, + * including any memory you may already have. If successful, the logic will + * have added this size to the token passed in. + */ + bool resize(MemoryTokenImpl& token, uint64_t min, uint64_t max); + + // Called by token destructor to free up tracked resources + virtual void freeToken(MemoryTokenImpl& token) = 0; + + virtual uint64_t getMemorySizeFreeForPriority(uint8_t priority) const = 0; + + // vespalib::Printable implementation + virtual void print(std::ostream& out, bool verbose, + const std::string& indent) const = 0; +}; + +class MemoryManager : public vespalib::Printable, + public MemoryManagerInterface +{ + AllocationLogic::UP _logic; + vespalib::Lock _typeLock; + std::map<std::string, MemoryAllocationType::LP> _types; + +public: + typedef std::unique_ptr<MemoryManager> UP; + + MemoryManager(AllocationLogic::UP); + ~MemoryManager(); + + virtual void setMaximumMemoryUsage(uint64_t max); + virtual void getState(MemoryState& state, bool resetMax = false); + + virtual const MemoryAllocationType& + registerAllocationType(const MemoryAllocationType& type); + + virtual const MemoryAllocationType& + getAllocationType(const std::string& name) const; + + virtual std::vector<const MemoryAllocationType*> getAllocationTypes() const; + + MemoryToken::UP allocate( + const MemoryAllocationType&, + uint64_t min, + uint64_t max, + uint8_t p, + ReduceMemoryUsageInterface* = 0); + + virtual uint64_t getMemorySizeFreeForPriority(uint8_t priority) const; + + virtual void print(std::ostream& out, bool verbose, + const std::string& indent) const; + +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorystate.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorystate.cpp new file mode 100644 index 00000000000..0c82a63792a --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorystate.cpp @@ -0,0 +1,224 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +#include <vespa/storageframework/defaultimplementation/memory/memorystate.h> + +LOG_SETUP(".memory.state"); + +namespace storage { +namespace framework { +namespace defaultimplementation { + +MemoryState::Entry::Entry() + : _currentUsedSize(0), + _totalUserCount(0), + _currentUserCount(0), + _wantedCount(0), + _minimumCount(0), + _deniedCount(0), + _forcedBeyondMaximumCount(0) +{ +} + +void +MemoryState::Entry::operator+=(const Entry& other) +{ + _currentUsedSize += other._currentUsedSize; + _currentUserCount += other._currentUserCount; + _totalUserCount += other._totalUserCount; + _wantedCount += other._wantedCount; + _minimumCount += other._minimumCount; + _deniedCount += other._deniedCount; + _forcedBeyondMaximumCount += other._forcedBeyondMaximumCount; +} + +MemoryState::SnapShot& +MemoryState::SnapShot::operator+=(const MemoryState::SnapShot& other) +{ + for (AllocationMap::const_iterator it = other._allocations.begin(); + it != other._allocations.end(); ++it) + { + PriorityMap& map(_allocations[it->first]); + for (PriorityMap::const_iterator it2 = it->second.begin(); + it2 != it->second.end(); ++it2) + { + Entry& entry(map[it2->first]); + entry += it2->second; + } + } + return *this; +} + +uint64_t +MemoryState::SnapShot::getUserCount() const +{ + uint64_t count = 0; + for (AllocationMap::const_iterator it = _allocations.begin(); + it != _allocations.end(); ++it) + { + for (PriorityMap::const_iterator it2 = it->second.begin(); + it2 != it->second.end(); ++it2) + { + count += it2->second._currentUserCount; + } + } + return count; +} + +MemoryState::MemoryState(Clock& clock, uint64_t maxMemory) + : _clock(&clock), + _maxMemory(maxMemory), + _current(), + _max(), + _minJumpToUpdateMax(10 * 1024 * 1024) +{ +} + +void +MemoryState::addToEntry(const MemoryAllocationType& type, uint64_t memory, + uint8_t priority, + AllocationResult result, bool forcedAllocation, + uint64_t allocationCounts) +{ + LOG(spam, "Allocating memory %s - %" PRIu64 " bytes at priority %u. " + "Count %" PRIu64 ".", + type.getName().c_str(), memory, priority, allocationCounts); + PriorityMap& map(_current._allocations[&type]); + Entry& e(map[priority]); + e._currentUsedSize += memory; + e._totalUserCount += allocationCounts; + if (allocationCounts == 0) { + // Resizes adding no more users still count as another total + // allocation attempt. + ++e._totalUserCount; + } + e._currentUserCount += allocationCounts; + switch (result) { + case GOT_MAX: ++e._wantedCount; break; + case GOT_MIN: ++e._minimumCount; break; + case DENIED: ++e._deniedCount; break; + } + if (forcedAllocation) ++e._forcedBeyondMaximumCount; + _current._usedMemory += memory; + if (!type.isCache()) { + _current._usedWithoutCache += memory; + } + if (_current._usedWithoutCache + > _max._usedWithoutCache + _minJumpToUpdateMax) + { + LOG(spam, "Updating max to current %" PRIu64 " bytes of memory used", + _current._usedWithoutCache); + _max = _current; + _max._timeTaken = _clock->getTimeInSeconds(); + } +} + +void +MemoryState::removeFromEntry(const MemoryAllocationType& type, uint64_t memory, + uint8_t priority, + uint64_t allocationCounts) +{ + LOG(spam, "Freeing memory %s - %" PRIu64 " bytes at priority %u. " + "Count %" PRIu64 ".", + type.getName().c_str(), memory, priority, allocationCounts); + PriorityMap& map(_current._allocations[&type]); + Entry& e(map[priority]); + e._currentUsedSize -= memory; + e._currentUserCount -= allocationCounts; + _current._usedMemory -= memory; + if (!type.isCache()) { + _current._usedWithoutCache -= memory; + } +} + +void +MemoryState::Entry::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + (void) verbose; (void) indent; + std::ostringstream ost; + ost << "Used(" << _currentUsedSize << " B / " + << _currentUserCount << ") "; + for (uint32_t i=ost.str().size(); i<20; ++i) { + ost << " "; + } + + out << ost.str() + << "Stats(" << _totalUserCount + << ", " << _wantedCount + << ", " << _minimumCount + << ", " << _deniedCount + << ", " << _forcedBeyondMaximumCount << ")"; +} + +namespace { + void printAllocations(std::ostream& out, + const MemoryState::AllocationMap& map, + const std::string& indent) + { + std::map<std::string, std::string> allocs; + + for (MemoryState::AllocationMap::const_iterator it = map.begin(); + it != map.end(); ++it) + { + for (MemoryState::PriorityMap::const_iterator it2 + = it->second.begin(); it2 != it->second.end(); ++it2) + { + std::ostringstream name; + name << it->first->getName() << "(" + << static_cast<uint16_t>(it2->first) << "): "; + for (uint32_t i=name.str().size(); i<25; ++i) { + name << " "; + } + + std::ostringstream tmp; + it2->second.print(tmp, true, indent + " "); + + allocs[name.str()] = tmp.str(); + } + } + + for (std::map<std::string, std::string>::const_iterator it + = allocs.begin(); it != allocs.end(); ++it) + { + out << "\n" << indent << " " << it->first << it->second; + } + } +} + +void +MemoryState::SnapShot::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + out << "SnapShot(Used " << _usedMemory << ", w/o cache " + << _usedWithoutCache; + if (verbose) { + out << ") {"; + if (_usedMemory > 0) { + out << "\n" << indent << " Type(Pri): Used(Size/Allocs) " + << "Stats(Allocs, Wanted, Min, Denied, Forced)"; + } + printAllocations(out, _allocations, indent); + out << "\n" << indent << "}"; + } +} + +void +MemoryState::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + bool maxSet = (_max._usedWithoutCache > _current._usedWithoutCache); + out << "MemoryState(Max memory: " << _maxMemory << ") {" + << "\n" << indent << " Current: "; + _current.print(out, verbose, indent + " "); + if (maxSet) { + out << "\n" << indent << " Max: "; + _max.print(out, verbose, indent + " "); + } + out << "\n" << indent << "}"; +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorystate.h b/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorystate.h new file mode 100644 index 00000000000..f78927da6ce --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/memorystate.h @@ -0,0 +1,141 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::MemoryState + * + * \brief Shows the state of current memory users + * + */ + +#pragma once + +#include <vespa/storageframework/defaultimplementation/memory/memorymanager.h> +#include <vespa/storageframework/storageframework.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +class MemoryState : public vespalib::Printable { +public: + struct Entry { + // Total number of bytes allocated to this entry right now + uint64_t _currentUsedSize; + // Total number of allocations done on this entry + uint64_t _totalUserCount; + // Total number of allocations for this entry right now + uint32_t _currentUserCount; + // Amount of times this entry has gotten all the memory it wanted + uint32_t _wantedCount; + // Amount of times this entry has gotten less than all the memory + // it wanted + uint32_t _minimumCount; + // Amount of times this entry has been denied getting memory + uint32_t _deniedCount; + // Amount of times this entry has forced memory allocations beyond + // the maximum + uint32_t _forcedBeyondMaximumCount; + + Entry(); + + void print(std::ostream& out, bool verbose, + const std::string& indent) const; + + /** + * Set this instances counts to the counts from the other entry. + */ + void transferCounts(const Entry& other); + + void operator+=(const Entry& other); + }; + + typedef std::map<uint8_t, Entry> PriorityMap; + typedef std::map<const MemoryAllocationType*, PriorityMap> AllocationMap; + + /** + * A snapshot contains data for either current or max seen data. + * When a new maximum is seen, current is copied to max. + */ + class SnapShot : public vespalib::Printable { + friend class MemoryState; + + uint64_t _usedMemory; + uint64_t _usedWithoutCache; + SecondTime _timeTaken; + AllocationMap _allocations; + + public: + SnapShot() : vespalib::Printable() { clear(); } + SnapShot(const SnapShot& o) : vespalib::Printable() { (*this) = o; } + + void print(std::ostream& out, bool verbose, + const std::string& indent) const; + + void clear() { + _usedMemory = 0; + _usedWithoutCache = 0; + _timeTaken.setTime(0); + _allocations.clear(); + } + + SnapShot& operator=(const SnapShot& other) { + _usedMemory = other._usedMemory; + _usedWithoutCache = other._usedWithoutCache; + _timeTaken = other._timeTaken; + _allocations = other._allocations; + return *this; + } + + SnapShot& operator+=(const SnapShot& other); + + const AllocationMap& getAllocations() const { return _allocations; } + uint64_t getUsedSize() const { return _usedMemory; } + uint64_t getUsedSizeIgnoringCache() const { return _usedWithoutCache; } + uint64_t getUserCount() const; + }; + +private: + Clock* _clock; + uint64_t _maxMemory; + SnapShot _current; + SnapShot _max; + uint32_t _minJumpToUpdateMax; + +public: + MemoryState(Clock& clock, uint64_t maxMemory); + + void print(std::ostream& out, bool verbose, + const std::string& indent) const; + + void setMaximumMemoryUsage(uint64_t max) { _maxMemory = max; } + void setMinJumpToUpdateMax(uint32_t bytes) { _minJumpToUpdateMax = bytes; } + + enum AllocationResult { GOT_MAX, GOT_MIN, DENIED }; + void addToEntry(const MemoryAllocationType& type, uint64_t memory, + uint8_t priority, + AllocationResult result, bool forcedAllocation = false, + uint64_t allocationCounts = 1); + + void removeFromEntry(const MemoryAllocationType& type, uint64_t memory, + uint8_t priority, + uint64_t allocationCounts = 1); + void resetMax() { + _max = _current; + _max._timeTaken = _clock->getTimeInSeconds(); + } + + const SnapShot& getCurrentSnapshot() const { return _current; } + const SnapShot& getMaxSnapshot() const { return _max; } + + + uint64_t getTotalSize() const { return _maxMemory; } + uint64_t getFreeSize() const { + return _maxMemory > _current._usedMemory + ? _maxMemory - _current._usedMemory : 0; + } +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/nomemorymanager.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/memory/nomemorymanager.cpp new file mode 100644 index 00000000000..c8d8e339bb9 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/nomemorymanager.cpp @@ -0,0 +1,48 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/memory/nomemorymanager.h> +#include <vespa/vespalib/util/exceptions.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +const MemoryAllocationType& +NoMemoryManager::registerAllocationType(const MemoryAllocationType& type) +{ + vespalib::LockGuard lock(_typeLock); + _types[type.getName()] = MemoryAllocationType::LP( + new MemoryAllocationType(type)); + return *_types[type.getName()]; +} + +const MemoryAllocationType& +NoMemoryManager::getAllocationType(const std::string& name) const +{ + vespalib::LockGuard lock(_typeLock); + std::map<std::string, MemoryAllocationType::LP>::const_iterator it( + _types.find(name)); + if (it == _types.end()) { + throw vespalib::IllegalArgumentException( + "Allocation type not found: " + name, VESPA_STRLOC); + } + return *it->second; +} + +std::vector<const MemoryAllocationType*> +NoMemoryManager::getAllocationTypes() const +{ + vespalib::LockGuard lock(_typeLock); + std::vector<const MemoryAllocationType*> types; + for(std::map<std::string, MemoryAllocationType::LP>::const_iterator it + = _types.begin(); it != _types.end(); ++it) + { + types.push_back(it->second.get()); + } + return types; +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/nomemorymanager.h b/storageframework/src/vespa/storageframework/defaultimplementation/memory/nomemorymanager.h new file mode 100644 index 00000000000..73a986ec877 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/nomemorymanager.h @@ -0,0 +1,72 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::NoMemoryManager + * + * \brief Memory manager that gives out max memory to everyone. + * + * Memory manager to use for testing and for apps not wanting to track memory. + * This manager will merely give out max to everyone who asks and not even keep + * track of anything. + */ + +#pragma once + +#include <boost/utility.hpp> +#include <map> +#include <vespa/storageframework/generic/memory/memorymanagerinterface.h> +#include <vespa/vespalib/util/printable.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +class SimpleMemoryTokenImpl : public MemoryToken, public boost::noncopyable +{ + uint64_t _allocated; + +public: + SimpleMemoryTokenImpl(uint64_t allocated) : _allocated(allocated) {} + + virtual uint64_t getSize() const { return _allocated; } + virtual bool resize(uint64_t /* min */, uint64_t max) + { _allocated = max; return true; } +}; + +class NoMemoryManager : public MemoryManagerInterface +{ + vespalib::Lock _typeLock; + std::map<std::string, MemoryAllocationType::LP> _types; + +public: + typedef std::unique_ptr<NoMemoryManager> UP; + + virtual void setMaximumMemoryUsage(uint64_t) {} + + virtual const MemoryAllocationType& + registerAllocationType(const MemoryAllocationType& type); + + virtual const MemoryAllocationType& + getAllocationType(const std::string& name) const; + + MemoryToken::UP allocate( + const MemoryAllocationType&, + uint64_t /* min */, + uint64_t max, + uint8_t /* priority */, + ReduceMemoryUsageInterface* = 0) + { + return SimpleMemoryTokenImpl::UP(new SimpleMemoryTokenImpl(max)); + } + virtual uint64_t getMemorySizeFreeForPriority(uint8_t priority) const { + (void) priority; + return std::numeric_limits<uint64_t>().max(); + } + + virtual std::vector<const MemoryAllocationType*> getAllocationTypes() const; +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.cpp new file mode 100644 index 00000000000..e7cd372d97d --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.cpp @@ -0,0 +1,38 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h> + +#include <vespa/log/log.h> +#include <vespa/vespalib/util/exceptions.h> + +LOG_SETUP(".memory.logic.priority"); + +namespace storage { +namespace framework { +namespace defaultimplementation { + +PriorityMemoryLogic::PriorityMemoryLogic(Clock& c, uint64_t maxMem) + : SimpleMemoryLogic(c, maxMem) +{ + LOG(debug, "Setup priority memory logic with max memory of %" PRIu64 " bytes", + maxMem); +} + +float +PriorityMemoryLogic::getNonCacheThreshold(uint8_t priority) const +{ + return 0.6 + ((255 - priority) / 255.0) * 0.4; +} + +void +PriorityMemoryLogic::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + out << "PriorityMemoryLogic() : "; + SimpleMemoryLogic::print(out, verbose, indent); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h b/storageframework/src/vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h new file mode 100644 index 00000000000..0761603a324 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +/** + * \class storage::PriorityMemoryLogic + * + * \brief Priority logic deciding who should get memory and how much. + * + */ + +#pragma once + +#include <vespa/storageframework/defaultimplementation/memory/simplememorylogic.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct PriorityMemoryLogic : public SimpleMemoryLogic +{ + PriorityMemoryLogic(Clock&, uint64_t maxMemory); + + virtual float getNonCacheThreshold(uint8_t priority) const; + + virtual void print(std::ostream& out, bool verbose, + const std::string& indent) const; +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/simplememorylogic.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/memory/simplememorylogic.cpp new file mode 100644 index 00000000000..6e64d7e704e --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/simplememorylogic.cpp @@ -0,0 +1,235 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/memory/simplememorylogic.h> + +#include <vespa/log/log.h> +#include <vespa/vespalib/util/exceptions.h> + +LOG_SETUP(".memory.logic.simple"); + +namespace storage { +namespace framework { +namespace defaultimplementation { + +SimpleMemoryLogic::SimpleMemoryLogic(Clock& c, uint64_t maxMemory) + : _cacheThreshold(0.98), + _nonCacheThreshold(0.8), + _state(c, maxMemory), + _reducers() +{ + LOG(debug, "Setup simple memory logic with max memory of %" PRIu64 " bytes", + maxMemory); +} + +void +SimpleMemoryLogic::setMaximumMemoryUsage(uint64_t max) +{ + vespalib::LockGuard lock(_stateLock); + _state.setMaximumMemoryUsage(max); +} + +void +SimpleMemoryLogic::getState(MemoryState& state, bool resetMax) { + vespalib::LockGuard lock(_stateLock); + state = _state; + if (resetMax) _state.resetMax(); +} + +MemoryToken::UP +SimpleMemoryLogic::allocate(const MemoryAllocationType& type, + uint8_t priority, + ReduceMemoryUsageInterface* reducer) +{ + MemoryTokenImpl::UP token( + new MemoryTokenImpl(*this, type, 0, priority, reducer)); + if (reducer != 0) { + vespalib::LockGuard lock(_stateLock); + _reducers.push_back(Reducer(*token, *reducer)); + } + return std::move(token); +} + +bool +SimpleMemoryLogic::resize(MemoryToken& tok, uint64_t min, uint64_t max, + uint32_t allocationCounts) +{ + vespalib::LockGuard lock(_stateLock); + MemoryTokenImpl& token(static_cast<MemoryTokenImpl&>(tok)); + LOG(spam, "Attempting to resize %s to size in the range %" PRIu64 " to " + "%" PRIu64 ".", token.toString().c_str(), min, max); + if (token.getSize() > max) { // Always safe to reduce size + handleReduction(token, max, allocationCounts); + return true; + } + // If not reducing size, calculate relative min/max values. + uint64_t relMin = (min > token.getSize() ? min - token.getSize() : 0); + uint64_t relMax = max - token.getSize(); + return resizeRelative(token, relMin, relMax, allocationCounts); +} + +void +SimpleMemoryLogic::handleReduction(MemoryTokenImpl& token, uint64_t max, + uint32_t allocationCounts) +{ + LOG(spam, "Reduzing size of token by %" PRIu64 ".", + token.getSize() - max); + _state.removeFromEntry(token.getType(), token.getSize() - max, + token.getPriority(), allocationCounts); + setTokenSize(token, max); +} + +bool +SimpleMemoryLogic::handleCacheMemoryRequest( + MemoryTokenImpl& token, uint64_t min, uint64_t max, + uint32_t allocationCounts) +{ + uint64_t usedSize(_state.getCurrentSnapshot().getUsedSize()); + uint64_t thresholdSize = uint64_t(getCacheThreshold() + * _state.getTotalSize()); + uint64_t toAllocate(thresholdSize > usedSize + ? std::min(thresholdSize - usedSize, max) + : 0); + bool forced = false; + if (token.getType().isAllocationsForced() && toAllocate < min) { + toAllocate = min; + forced = true; + } + if (toAllocate < min) { + LOG(spam, "We cannot give more memory to cache without going above " + "cache threshold (%" PRIu64 " B)", thresholdSize); + _state.addToEntry(token.getType(), 0, token.getPriority(), + MemoryState::DENIED, false, allocationCounts); + return false; + } + LOG(spam, "Giving %" PRIu64 " bytes of memory to cache. (Cache threshold " + "is %" PRIu64 ", used size is %" PRIu64 ", %" PRIu64 " bytes were " + "always allocated to the token and it wanted memory between %" + PRIu64 " and %" PRIu64 ".", + toAllocate, thresholdSize, usedSize, token.getSize(), min, max); + _state.addToEntry(token.getType(), toAllocate, token.getPriority(), + static_cast<uint64_t>(toAllocate) >= max + ? MemoryState::GOT_MAX : MemoryState::GOT_MIN, + forced, allocationCounts); + setTokenSize(token, token.getSize() + toAllocate); + return true; +} + +uint64_t +SimpleMemoryLogic::getMemorySizeFreeForPriority(uint8_t priority) const +{ + uint64_t usedSize(_state.getCurrentSnapshot().getUsedSizeIgnoringCache()); + uint64_t thresholdSize = uint64_t(getNonCacheThreshold(priority) + * _state.getTotalSize()); + return (usedSize >= thresholdSize ? 0 : thresholdSize - usedSize); +} + +bool +SimpleMemoryLogic::resizeRelative( + MemoryTokenImpl& token, uint64_t min, uint64_t max, + uint32_t allocationCounts) +{ + LOG(spam, "Relative resize change. Need another %zu-%zu byte of memory.", + min, max); + // If requester is cache, use cache threshold + if (token.getType().isCache()) { + return handleCacheMemoryRequest(token, min, max, allocationCounts); + } + // If we get here, requester is not cache. + uint64_t usedSize(_state.getCurrentSnapshot().getUsedSizeIgnoringCache()); + uint64_t thresholdSize = uint64_t(getNonCacheThreshold(token.getPriority()) + * _state.getTotalSize()); + uint64_t toAllocate = 0; + if (thresholdSize > usedSize) { + toAllocate = std::min(max, thresholdSize - usedSize); + } + if (toAllocate < min) toAllocate = min; + bool forced = false; + if (usedSize + toAllocate > _state.getTotalSize()) { + if (token.getType().isAllocationsForced()) { + forced = true; + } else { + LOG(spam, "We cannot give more memory without going beyond max"); + _state.addToEntry(token.getType(), 0, token.getPriority(), + MemoryState::DENIED, false, allocationCounts); + return false; + } + } + // External load should not fill up too much + if (usedSize + toAllocate > thresholdSize + && token.getType().isExternalLoad() + && !token.getType().isAllocationsForced()) + { + LOG(spam, "Not giving external load memory beyond threshold."); + _state.addToEntry(token.getType(), 0, token.getPriority(), + MemoryState::DENIED, false, allocationCounts); + return false; + } + // If this puts us above max with cache, remove some cache. + if (_state.getCurrentSnapshot().getUsedSize() + toAllocate + > _state.getTotalSize()) + { + uint64_t needed(_state.getCurrentSnapshot().getUsedSize() + + toAllocate - _state.getTotalSize()); + for (uint32_t i=0; i<_reducers.size(); ++i) { + MemoryTokenImpl& rtoken(*_reducers[i]._token); + uint64_t reduceBy(std::min(needed, rtoken.getSize())); + uint64_t reduced(_reducers[i]._reducer->reduceMemoryConsumption( + rtoken, reduceBy)); + _state.removeFromEntry(rtoken.getType(), reduced, + rtoken.getPriority(), 0); + setTokenSize(rtoken, rtoken.getSize() - reduced); + needed -= reduceBy; + if (needed == 0) break; + if (reduced < reduceBy) { + LOG(debug, "Reducer refused to free the full %" PRIu64 " bytes " + "requested. %" PRIu64 " bytes reduced in token %s.", + reduceBy, reduced, rtoken.toString().c_str()); + } + } + } + if (_state.getCurrentSnapshot().getUsedSize() + toAllocate + > _state.getTotalSize()) + { + LOGBP(debug, "Failed to free enough memory from cache. This puts us " + "above max memory."); + } + LOG(spam, "Giving %" PRIu64 " bytes of memory", toAllocate); + _state.addToEntry(token.getType(), toAllocate, token.getPriority(), + static_cast<uint64_t>(toAllocate) >= max + ? MemoryState::GOT_MAX : MemoryState::GOT_MIN, + forced, allocationCounts); + setTokenSize(token, token.getSize() + toAllocate); + return true; +} + +void +SimpleMemoryLogic::freeToken(MemoryTokenImpl& token) +{ + vespalib::LockGuard lock(_stateLock); + _state.removeFromEntry(token.getType(), token.getSize(), + token.getPriority(), token.getAllocationCount()); + if (token.getReducer() != 0) { + std::vector<Reducer> reducers; + reducers.reserve(_reducers.size() - 1); + for (uint32_t i=0; i<_reducers.size(); ++i) { + if (_reducers[i]._token != &token) reducers.push_back(_reducers[i]); + } + assert(reducers.size() + 1 == _reducers.size()); + reducers.swap(_reducers); + } +} + +void +SimpleMemoryLogic::print(std::ostream& out, bool verbose, + const std::string& indent) const +{ + out << "SimpleMemoryLogic() {\n" + << indent << " "; + vespalib::LockGuard lock(_stateLock); + _state.print(out, verbose, indent + " "); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/memory/simplememorylogic.h b/storageframework/src/vespa/storageframework/defaultimplementation/memory/simplememorylogic.h new file mode 100644 index 00000000000..aafae3d6a21 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/memory/simplememorylogic.h @@ -0,0 +1,100 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +/** + * \class storage::SimpleMemoryLogic + * + * \brief Simple logic deciding who should get memory and how much. + * + * There is a cache threshold. By default 98%. Cache will always get memory up + * till this fillrate. + * + * There is a non-cache threshold. Non-cache memory requesters will get maximum + * memory until threshold is reached. If getting maximum memory would go beyond + * the non-cache threshold, the requester will get enough memory to hit the + * threshold (if more than minimum), or get the minimum memory asked for, if + * that doesn't put usage above 100%. + * + * Usage above 100% is attempted avoided by freeing cache memory. If failing to + * free enough memory, request will fail, or minimum will be get if allocation + * is forced such that it cannot fail. In such a case, usage may go beyond 100%. + */ + +#pragma once + +#include <vespa/storageframework/defaultimplementation/memory/memorymanager.h> +#include <vespa/storageframework/defaultimplementation/memory/memorystate.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +class SimpleMemoryLogic : public AllocationLogic +{ + float _cacheThreshold; + float _nonCacheThreshold; + vespalib::Lock _stateLock; + MemoryState _state; + struct Reducer { + MemoryTokenImpl* _token; + ReduceMemoryUsageInterface* _reducer; + + Reducer() : _token(0), _reducer(0) {} + Reducer(MemoryTokenImpl& t, + ReduceMemoryUsageInterface& r) + : _token(&t), _reducer(&r) {} + }; + std::vector<Reducer> _reducers; + +protected: + float getCacheThreshold() { return _cacheThreshold; } + + // Priority memory logic can override this to set a threshold based on + // priority + virtual float getNonCacheThreshold(uint8_t priority) const + { (void) priority; return _nonCacheThreshold; } + +public: + typedef std::unique_ptr<SimpleMemoryLogic> UP; + + SimpleMemoryLogic(Clock&, uint64_t maxMemory); + + SimpleMemoryLogic& setMinJumpToUpdateMax(uint32_t bytes) { + _state.setMinJumpToUpdateMax(bytes); + return *this; + } + + virtual void setMaximumMemoryUsage(uint64_t max); + + void setCacheThreshold(float limit) { _cacheThreshold = limit; } + void setNonCacheThreshold(float limit) { _nonCacheThreshold = limit; } + + MemoryState& getState() { return _state; } // Not threadsafe. Unit testing. + + virtual void getState(MemoryState& state, bool resetMax); + + virtual MemoryToken::UP allocate(const MemoryAllocationType&, + uint8_t priority, + ReduceMemoryUsageInterface* = 0); + virtual bool resize(MemoryToken& token, uint64_t min, uint64_t max, + uint32_t allocationCounts); + + virtual void freeToken(MemoryTokenImpl& token); + virtual void print(std::ostream& out, bool verbose, + const std::string& indent) const; + + virtual uint64_t getMemorySizeFreeForPriority(uint8_t priority) const; + +private: + void handleReduction(MemoryTokenImpl&, uint64_t size, + uint32_t allocationCounts); + bool resizeRelative(MemoryTokenImpl&, uint64_t min, uint64_t max, + uint32_t allocationCounts); + bool handleCacheMemoryRequest(MemoryTokenImpl&, uint64_t min, uint64_t max, + uint32_t allocationCounts); +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/.gitignore b/storageframework/src/vespa/storageframework/defaultimplementation/thread/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/CMakeLists.txt b/storageframework/src/vespa/storageframework/defaultimplementation/thread/CMakeLists.txt new file mode 100644 index 00000000000..d692fb24705 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_threadimpl OBJECT + SOURCES + threadimpl.cpp + threadpoolimpl.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp new file mode 100644 index 00000000000..20fd12ea959 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -0,0 +1,150 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/thread/threadimpl.h> + +#include <vespa/log/log.h> +#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> + +LOG_SETUP(".framework.thread.impl"); + +namespace storage { +namespace framework { +namespace defaultimplementation { + +ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, + Runnable& runnable, + vespalib::stringref id, + uint64_t waitTimeMs, + uint64_t maxProcessTimeMs, + int ticksBeforeWait) + : Thread(id), + _pool(pool), + _runnable(runnable), + _properties(waitTimeMs, maxProcessTimeMs, ticksBeforeWait), + _tickData(), + _tickDataPtr(0), + _interrupted(false), + _joined(false), + _thread(*this) +{ + _tickData[_tickDataPtr]._lastTickMs = pool.getClock().getTimeInMillis().getTime(); + _thread.start(_pool.getThreadPool()); +} + +ThreadImpl::~ThreadImpl() +{ + interrupt(); + join(); +} + +void +ThreadImpl::run() +{ + _runnable.run(*this); + _pool.unregisterThread(*this); + _joined = true; +} + +bool +ThreadImpl::interrupted() const +{ + return _interrupted; +} + +bool +ThreadImpl::joined() const +{ + return _joined; +} + +void +ThreadImpl::interrupt() +{ + _interrupted = true; + _thread.stop(); +} + +void +ThreadImpl::join() +{ + _thread.join(); +} + +void +ThreadImpl::registerTick(CycleType cycleType, MilliSecTime time) +{ + if (!time.isSet()) time = _pool.getClock().getTimeInMillis(); + ThreadTickData data(getTickData()); + uint64_t previousTickMs = data._lastTickMs; + uint64_t nowMs = time.getTime(); + data._lastTickMs = nowMs; + data._lastTickType = cycleType; + setTickData(data); + + if (data._lastTickMs == 0) { return; } + + if (previousTickMs > nowMs) { + LOGBP(warning, "Thread is registering tick at time %" PRIu64 ", but " + "last time it registered a tick, the time was %" + PRIu64 ". Assuming clock has been adjusted backwards", + nowMs, previousTickMs); + return; + } + uint64_t cycleTimeMs = nowMs - previousTickMs; + if (cycleType == WAIT_CYCLE) { + data._maxWaitTimeSeenMs = std::max(data._maxWaitTimeSeenMs, cycleTimeMs); + } else { + data._maxProcessingTimeSeenMs = std::max(data._maxProcessingTimeSeenMs, cycleTimeMs); + } +} + +ThreadTickData +ThreadImpl::getTickData() const +{ + return _tickData[_tickDataPtr].loadRelaxed(); +} + +void +ThreadImpl::setTickData(const ThreadTickData& tickData) +{ + uint32_t nextData = (_tickDataPtr + 1) % _tickData.size(); + _tickData[nextData].storeRelaxed(tickData); + _tickDataPtr = nextData; +} + +void +ThreadImpl::updateParameters(uint64_t waitTimeMs, + uint64_t maxProcessTimeMs, + int ticksBeforeWait) { + _properties.setWaitTime(waitTimeMs); + _properties.setMaxProcessTime(maxProcessTimeMs); + _properties.setTicksBeforeWait(ticksBeforeWait); +} + +ThreadTickData +ThreadImpl::AtomicThreadTickData::loadRelaxed() const noexcept +{ + ThreadTickData result; + constexpr auto relaxed = std::memory_order_relaxed; + result._lastTickType = _lastTickType.load(relaxed); + result._lastTickMs = _lastTickMs.load(relaxed); + result._maxProcessingTimeSeenMs = _maxProcessingTimeSeenMs.load(relaxed); + result._maxWaitTimeSeenMs = _maxWaitTimeSeenMs.load(relaxed); + return result; +} + +void +ThreadImpl::AtomicThreadTickData::storeRelaxed( + const ThreadTickData& newState) noexcept +{ + constexpr auto relaxed = std::memory_order_relaxed; + _lastTickType.store(newState._lastTickType, relaxed); + _lastTickMs.store(newState._lastTickMs, relaxed); + _maxProcessingTimeSeenMs.store(newState._maxProcessingTimeSeenMs, relaxed); + _maxWaitTimeSeenMs.store(newState._maxWaitTimeSeenMs, relaxed); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h new file mode 100644 index 00000000000..f5dfb4237d5 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -0,0 +1,90 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/document_runnable.h> +#include <vespa/storageframework/storageframework.h> +#include <array> +#include <atomic> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +class ThreadPoolImpl; + +class ThreadImpl : public Thread +{ + struct BackendThread : public document::Runnable { + ThreadImpl& _impl; + BackendThread(ThreadImpl& impl) : _impl(impl) {} + virtual void run() { _impl.run(); } + }; + + /** + * Internal data race free implementation of tick data that maps to and + * from ThreadTickData. We hide the atomicity of this since atomic vars + * are not CopyConstructible and thus would impose unnecessary limitations + * on code using it. + */ + struct AtomicThreadTickData { + std::atomic<CycleType> _lastTickType; + std::atomic<uint64_t> _lastTickMs; + std::atomic<uint64_t> _maxProcessingTimeSeenMs; + std::atomic<uint64_t> _maxWaitTimeSeenMs; + // struct stores and loads are both data race free with relaxed + // memory semantics. This means it's possible to observe stale/partial + // state in a case with concurrent readers/writers. + ThreadTickData loadRelaxed() const noexcept; + void storeRelaxed(const ThreadTickData& newState) noexcept; + }; + + ThreadPoolImpl& _pool; + Runnable& _runnable; + ThreadProperties _properties; + std::array<AtomicThreadTickData, 3> _tickData; + uint32_t _tickDataPtr; + bool _interrupted; + bool _joined; + BackendThread _thread; + + void run(); + +public: + ThreadImpl(ThreadPoolImpl&, + Runnable&, + vespalib::stringref id, + uint64_t waitTimeMs, + uint64_t maxProcessTimeMs, + int ticksBeforeWait); + ~ThreadImpl(); + + virtual bool interrupted() const; + virtual bool joined() const; + virtual void interrupt(); + virtual void join(); + virtual void registerTick(CycleType, MilliSecTime); + virtual uint64_t getWaitTime() const { + return _properties.getWaitTime(); + } + virtual int getTicksBeforeWait() const { + return _properties.getTicksBeforeWait(); + } + virtual uint64_t getMaxProcessTime() const { + return _properties.getMaxProcessTime(); + } + + virtual void updateParameters(uint64_t waitTime, + uint64_t maxProcessTime, + int ticksBeforeWait); + + void setTickData(const ThreadTickData&); + ThreadTickData getTickData() const; + const ThreadProperties& getProperties() const + { return _properties; } +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp new file mode 100644 index 00000000000..baf289613f0 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp @@ -0,0 +1,94 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> + +#include <iostream> +#include <vespa/storageframework/defaultimplementation/thread/threadimpl.h> +#include <vespa/vespalib/util/exceptions.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +ThreadPoolImpl::ThreadPoolImpl(Clock& clock) + : _backendThreadPool(512 * 1024), + _clock(clock), + _stopping(false) +{ +} + +ThreadPoolImpl::~ThreadPoolImpl() +{ + { + vespalib::LockGuard lock(_threadVectorLock); + _stopping = true; + for (uint32_t i=0, n=_threads.size(); i<n; ++i) { + _threads[i]->interrupt(); + } + for (uint32_t i=0, n=_threads.size(); i<n; ++i) { + _threads[i]->join(); + } + } + for (uint32_t i=0; true; i+=10) { + { + vespalib::LockGuard lock(_threadVectorLock); + if (_threads.empty()) break; + } + if (i > 1000) { + std::cerr << "Failed to kill thread pool. Threads won't die. (And " + << "if allowing thread pool object to be deleted this " + << "will create a segfault later)\n"; + assert(false); + } + FastOS_Thread::Sleep(10); + } + _backendThreadPool.Close(); +} + +Thread::UP +ThreadPoolImpl::startThread(Runnable& runnable, + vespalib::stringref id, + uint64_t waitTimeMs, + uint64_t maxProcessTime, + int ticksBeforeWait) +{ + vespalib::LockGuard lock(_threadVectorLock); + if (_stopping) { + throw vespalib::IllegalStateException( + "Threadpool is stopping", VESPA_STRLOC); + } + ThreadImpl* ti; + Thread::UP t(ti = new ThreadImpl( + *this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait)); + _threads.push_back(ti); + return t; +} + +void +ThreadPoolImpl::visitThreads(ThreadVisitor& visitor) const +{ + vespalib::LockGuard lock(_threadVectorLock); + for (uint32_t i=0, n=_threads.size(); i<n; ++i) { + visitor.visitThread(_threads[i]->getId(), _threads[i]->getProperties(), + _threads[i]->getTickData()); + } +} + +void +ThreadPoolImpl::unregisterThread(ThreadImpl& t) +{ + vespalib::LockGuard lock(_threadVectorLock); + std::vector<ThreadImpl*> threads; + threads.reserve(_threads.size()); + for (uint32_t i=0, n=_threads.size(); i<n; ++i) { + if (_threads[i] != &t) { + threads.push_back(_threads[i]); + } + } + _threads.swap(threads); + } + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h new file mode 100644 index 00000000000..785cc5e27b7 --- /dev/null +++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h @@ -0,0 +1,45 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + + +#pragma once + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/storageframework.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +class ThreadImpl; + +struct ThreadPoolImpl : public ThreadPool +{ + FastOS_ThreadPool _backendThreadPool; + std::vector<ThreadImpl*> _threads; + vespalib::Lock _threadVectorLock; + Clock& _clock; + bool _stopping; + +public: + ThreadPoolImpl(Clock&); + ~ThreadPoolImpl(); + + Thread::UP startThread(Runnable&, + vespalib::stringref id, + uint64_t waitTimeMs, + uint64_t maxProcessTime, + int ticksBeforeWait); + void visitThreads(ThreadVisitor&) const; + + void registerThread(ThreadImpl&); + void unregisterThread(ThreadImpl&); + FastOS_ThreadPool& getThreadPool() { return _backendThreadPool; } + Clock& getClock() { return _clock; } + +}; + +} // defaultimplementation +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/.gitignore b/storageframework/src/vespa/storageframework/generic/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/generic/CMakeLists.txt b/storageframework/src/vespa/storageframework/generic/CMakeLists.txt new file mode 100644 index 00000000000..c4ecd9f7920 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/CMakeLists.txt @@ -0,0 +1,11 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_generic + SOURCES + $<TARGET_OBJECTS:storageframework_component> + $<TARGET_OBJECTS:storageframework_status> + $<TARGET_OBJECTS:storageframework_thread> + $<TARGET_OBJECTS:storageframework_memory> + $<TARGET_OBJECTS:storageframework_clock> + INSTALL lib64 + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/generic/clock/.gitignore b/storageframework/src/vespa/storageframework/generic/clock/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/clock/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/generic/clock/CMakeLists.txt b/storageframework/src/vespa/storageframework/generic/clock/CMakeLists.txt new file mode 100644 index 00000000000..96cf4eba5aa --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/clock/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_clock OBJECT + SOURCES + time.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/generic/clock/clock.h b/storageframework/src/vespa/storageframework/generic/clock/clock.h new file mode 100644 index 00000000000..bd9415a5a8c --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/clock/clock.h @@ -0,0 +1,34 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Clock + * \ingroup clock + * + * \brief Class used to attain current time. + * + * This class wraps how the time is retrieved. A common clock is useful in order + * to let unit tests fake time. It is also useful to have one point for all + * time calculations, such that one can possibly optimize if time retrieval + * becomes a bottle neck. + */ + +#pragma once + +#include <memory> +#include <vespa/storageframework/generic/clock/time.h> + +namespace storage { +namespace framework { + +struct Clock { + typedef std::unique_ptr<Clock> UP; + + virtual ~Clock() {} + + virtual MicroSecTime getTimeInMicros() const = 0; + virtual MilliSecTime getTimeInMillis() const = 0; + virtual SecondTime getTimeInSeconds() const = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/clock/time.cpp b/storageframework/src/vespa/storageframework/generic/clock/time.cpp new file mode 100644 index 00000000000..4a379124bda --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/clock/time.cpp @@ -0,0 +1,87 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/clock/time.h> + +#include <iomanip> +#include <sstream> +#include <vespa/storageframework/generic/clock/clock.h> +#include <sys/time.h> +#include <vector> + +namespace storage { +namespace framework { + +namespace { + void detectUnit(uint64_t& val, const char* unit, uint64_t size, + std::vector<std::pair<uint64_t, vespalib::string> >& units) { + if (val / size > 0) { + uint64_t value = val / size; + vespalib::string unitname = unit; + if (value != 1) unitname += "s"; + units.push_back(std::make_pair(value, unitname)); + val -= value * size; + } + } +} + +vespalib::string +getTimeString(uint64_t microSecondTime, TimeFormat format) +{ + // Rewrite to use other type of stream later if needed for performance + std::ostringstream ost; + if (format & DIFFERENCE_ALL) { + std::vector<std::pair<uint64_t, vespalib::string> > vals; + detectUnit(microSecondTime, "day", 24 * 60 * 60 * 1000 * 1000ull, vals); + detectUnit(microSecondTime, "hour", 60 * 60 * 1000 * 1000ull, vals); + detectUnit(microSecondTime, "minute", 60 * 1000 * 1000, vals); + detectUnit(microSecondTime, "second", 1000 * 1000, vals); + if (format & DIFFERENCE_WITH_MICROS) { + detectUnit(microSecondTime, "microsecond", 1, vals); + if (vals.empty()) { ost << "0 microseconds"; } + } else { + if (vals.empty()) { ost << "0 seconds"; } + } + if (vals.empty()) { + return vespalib::string(ost.str().c_str()); + } + ost << vals[0].first << " " << vals[0].second; + for (uint32_t i=1; i<vals.size(); ++i) { + if (i + 1 >= vals.size()) { + ost << " and "; + } else { + ost << ", "; + } + ost << vals[i].first << " " << vals[i].second; + } + return vespalib::string(ost.str().c_str()); + } + time_t secondTime = microSecondTime / 1000000; + struct tm datestruct; + struct tm* datestructptr = gmtime_r(&secondTime, &datestruct); + assert(datestructptr); + (void) datestructptr; + ost << std::setfill('0') + << std::setw(4) << (datestruct.tm_year + 1900) + << '-' << std::setw(2) << (datestruct.tm_mon + 1) + << '-' << std::setw(2) << datestruct.tm_mday + << ' ' << std::setw(2) << datestruct.tm_hour + << ':' << std::setw(2) << datestruct.tm_min + << ':' << std::setw(2) << datestruct.tm_sec; + uint64_t micros = microSecondTime % 1000000; + if (format == DATETIME_WITH_MILLIS) { + ost << '.' << std::setw(3) << (micros / 1000); + } else if (format == DATETIME_WITH_MICROS) { + ost << '.' << std::setw(6) << micros; + } + return vespalib::string(ost.str().c_str()); +} + +uint64_t +getRawMicroTime(const Clock& clock) +{ + return clock.getTimeInMicros().getTime(); +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/clock/time.h b/storageframework/src/vespa/storageframework/generic/clock/time.h new file mode 100644 index 00000000000..e09a4d5ef92 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/clock/time.h @@ -0,0 +1,172 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/operators.hpp> +#include <limits> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/stllike/asciistream.h> + +namespace storage { +namespace framework { + +class Clock; + +enum TimeFormat { + DATETIME = 0x01, // 2010-04-26 19:23:03 + DATETIME_WITH_MILLIS = 0x02, // 2010-04-26 19:23:03.001 + DATETIME_WITH_MICROS = 0x04, // 2010-04-26 19:23:03.001023 + DATETIME_ALL = 0x07, + DIFFERENCE = 0x10, // 1 day, 4 hours, 43 minutes and 3 seconds + DIFFERENCE_WITH_MICROS = 0x20, // 1 day, 4 hours, 43 minutes, 3 seconds and 123123 microseconds + DIFFERENCE_ALL = 0x30 +}; + +/** + * Utility function used by Time instances (to avoid implementation in + * header file). + */ +vespalib::string getTimeString(uint64_t microSecondTime, TimeFormat format); + +// As this class can't include clock, this utility function can be used in +// header implementation to get actual time. +uint64_t getRawMicroTime(const Clock&); + +/** + * Class containing common functionality for the various time instances. Try to + * make time instances as easy to use as possible, without creating risk of + * automatic conversion between time types. + */ +template<typename Type, int MicrosPerUnit> +class Time : public boost::operators<Type> +{ + uint64_t _time; // time_t may be signed. Negative timestamps is just a + // source for bugs. Enforce unsigned. + +protected: + Time(uint64_t t) : _time(t) {} + +public: + uint64_t getTime() const { return _time; } + void setTime(uint64_t t) { _time = t; } + bool isSet() const { return (_time != 0); } + + Type& operator-=(const Type& o) + { _time -= o._time; return static_cast<Type&>(*this); } + Type& operator+=(const Type& o) + { _time += o._time; return static_cast<Type&>(*this); } + bool operator<(const Type& o) const { return (_time < o._time); } + bool operator==(const Type& o) const { return (_time == o._time); } + Type& operator++() { ++_time; return static_cast<Type&>(*this); } + Type& operator--() { --_time; return *this; } + + Type getDiff(const Type& o) const { + return Type(_time > o._time ? _time - o._time : o._time - _time); + } + + vespalib::string toString(TimeFormat timeFormat = DATETIME) const { + return getTimeString(_time * MicrosPerUnit, timeFormat); + } + + static Type max() { return Type(std::numeric_limits<uint64_t>().max()); } + static Type min() { return Type(0); } + +}; + +template<typename Type, typename Number> +Type& operator/(Type& type, Number n) { + type.setTime(type.getTime() / n); + return type; +} + +template<typename Type, typename Number> +Type& operator*(Type& type, Number n) { + type.setTime(type.getTime() * n); + return type; +} + +template<typename Type, int MPU> +std::ostream& operator<<(std::ostream& out, const Time<Type, MPU>& t) { + return out << t.getTime(); +} + +template<typename Type, int MPU> +vespalib::asciistream& operator<<(vespalib::asciistream& out, const Time<Type, MPU>& t) { + return out << t.getTime(); +} + +class MicroSecTime; +class MilliSecTime; + +/** + * \class storage::framework::SecondTime + * \ingroup clock + * + * \brief Wrapper class for a timestamp in seconds. + * + * To prevent errors where one passes time in one granularity to a function + * requiring time in another granularity. This little wrapper class exist to + * make sure that will conflict in types + */ +struct SecondTime : public Time<SecondTime, 1000000> { + explicit SecondTime(uint64_t t = 0) : Time<SecondTime, 1000000>(t) {} + explicit SecondTime(const Clock& clock) + : Time<SecondTime, 1000000>(getRawMicroTime(clock) / 1000000) {} + + MilliSecTime getMillis() const; + MicroSecTime getMicros() const; +}; + +/** + * \class storage::framework::MilliSecTime + * \ingroup clock + * + * \brief Wrapper class for a timestamp in milliseconds. + * + * To prevent errors where one passes time in one granularity to a function + * requiring time in another granularity. This little wrapper class exist to + * make sure that will conflict in types + */ +struct MilliSecTime : public Time<MilliSecTime, 1000> { + explicit MilliSecTime(uint64_t t = 0) : Time<MilliSecTime, 1000>(t) {} + explicit MilliSecTime(const Clock& clock) + : Time<MilliSecTime, 1000>(getRawMicroTime(clock) / 1000) {} + + SecondTime getSeconds() const { return SecondTime(getTime() / 1000); } + MicroSecTime getMicros() const; +}; + +/** + * \class storage::framework::MicroSecTime + * \ingroup clock + * + * \brief Wrapper class for a timestamp in seconds. + * + * To prevent errors where one passes time in one granularity to a function + * requiring time in another granularity. This little wrapper class exist to + * make sure that will conflict in types + */ +struct MicroSecTime : public Time<MicroSecTime, 1> { + explicit MicroSecTime(uint64_t t = 0) : Time<MicroSecTime, 1>(t) {} + explicit MicroSecTime(const Clock& clock) + : Time<MicroSecTime, 1>(getRawMicroTime(clock)) {} + + MilliSecTime getMillis() const { return MilliSecTime(getTime() / 1000); } + SecondTime getSeconds() const { return SecondTime(getTime() / 1000000); } +}; + +inline MilliSecTime SecondTime::getMillis() const { + return MilliSecTime(getTime() * 1000); +} + +inline MicroSecTime SecondTime::getMicros() const { + return MicroSecTime(getTime() * 1000 * 1000); +} + +inline MicroSecTime MilliSecTime::getMicros() const { + return MicroSecTime(getTime() * 1000); +} + +} // framework +} // storage + + diff --git a/storageframework/src/vespa/storageframework/generic/clock/timer.h b/storageframework/src/vespa/storageframework/generic/clock/timer.h new file mode 100644 index 00000000000..66ff8cdef15 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/clock/timer.h @@ -0,0 +1,36 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Timer + * \ingroup clock + * + * \brief Class used to measure time differences. + * + * This timer class is a simple class that then used as a time_t instance, it + * will calculate time difference from some preset point of time. + */ + +#pragma once + +#include <vespa/storageframework/generic/clock/clock.h> + +namespace storage { +namespace framework { + +class MilliSecTimer { + const Clock* _clock; + time_t _startTime; + +public: + MilliSecTimer(const Clock& clock) + : _clock(&clock), _startTime(getCurrentTime()) {} + + MilliSecTime getTime() const { return MilliSecTime(getCurrentTime() - _startTime); } + operator time_t() const { return getCurrentTime() - _startTime; } + + time_t getCurrentTime() const + { return _clock->getTimeInMillis().getTime(); } +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/component/.gitignore b/storageframework/src/vespa/storageframework/generic/component/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/component/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/generic/component/CMakeLists.txt b/storageframework/src/vespa/storageframework/generic/component/CMakeLists.txt new file mode 100644 index 00000000000..b6f84a537c1 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/component/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_component OBJECT + SOURCES + component.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/generic/component/component.cpp b/storageframework/src/vespa/storageframework/generic/component/component.cpp new file mode 100644 index 00000000000..ba741aade2a --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/component/component.cpp @@ -0,0 +1,144 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/component/component.h> + +#include <vespa/storageframework/generic/component/componentregister.h> +#include <vespa/storageframework/generic/metric/metricregistrator.h> +#include <vespa/storageframework/generic/thread/threadpool.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { + +void +Component::open() +{ + if (_listener != 0) _listener->onOpen(); +} + +void +Component::close() +{ + if (_listener != 0) _listener->onClose(); +} + +Component::Component(ComponentRegister& cr, vespalib::stringref name) + : _componentRegister(&cr), + _name(name), + _status(0), + _metric(0), + _threadPool(0), + _metricReg(0), + _memoryManager(0), + _clock(0), + _listener(0) +{ + cr.registerComponent(*this); +} + +Component::~Component() +{ +} + +void +Component::registerComponentStateListener(ComponentStateListener& l) +{ + assert(_listener == 0); + _listener = &l; +} + +void +Component::registerStatusPage(const StatusReporter& sr) +{ + assert(_status == 0); + _status = &sr; +} + +void +Component::registerMetric(metrics::Metric& m) +{ + assert(_metric == 0); + _metric = &m; + if (_metricReg != 0) { + _metricReg->registerMetric(m); + } +} + +void +Component::registerMetricUpdateHook(MetricUpdateHook& hook, SecondTime period) +{ + assert(_metricUpdateHook.first == 0); + _metricUpdateHook = std::make_pair(&hook, period); + if (_metricReg != 0) { + _metricReg->registerUpdateHook( + _name, *_metricUpdateHook.first, _metricUpdateHook.second); + } +} + +metrics::MetricLockGuard +Component::getMetricManagerLock() +{ + if (_metricReg != 0) { + return _metricReg->getMetricManagerLock(); + } else { + return metrics::MetricLockGuard(); + } +} + +void +Component::setMetricRegistrator(MetricRegistrator& mr) { + _metricReg = &mr; + if (_metricUpdateHook.first != 0) { + _metricReg->registerUpdateHook( + _name, *_metricUpdateHook.first, _metricUpdateHook.second); + } + if (_metric != 0) { + _metricReg->registerMetric(*_metric); + } +} + +ThreadPool& +Component::getThreadPool() const +{ + assert(_threadPool != 0); + return *_threadPool; +} + +MemoryManagerInterface& +Component::getMemoryManager() const +{ + assert(_memoryManager != 0); + return *_memoryManager; +} + +Clock& +Component::getClock() const +{ + assert(_clock != 0); + return *_clock; +} + + // Helper functions for components wanting to start a single thread. +Thread::UP +Component::startThread(Runnable& runnable, + MilliSecTime waitTime, + MilliSecTime maxProcessTime, + int ticksBeforeWait) +{ + return getThreadPool().startThread(runnable, + getName(), + waitTime.getTime(), + maxProcessTime.getTime(), + ticksBeforeWait); +} + +void +Component::requestShutdown(vespalib::stringref reason) +{ + _componentRegister->requestShutdown(reason); +} + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/component/component.h b/storageframework/src/vespa/storageframework/generic/component/component.h new file mode 100644 index 00000000000..063b1102b13 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/component/component.h @@ -0,0 +1,223 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Component + * \ingroup component + * + * \brief Application component class + * + * The component class is used to give a component of an application a set of + * generic tools without depending on the implementation of these. + * + * This class should not depend on the actual implementation of what it serves. + * Neither in the header file nor the object file. Implementations will be + * provided by the application server. + * + * Services given should all be generic features. Application specific stuff + * we should handle in another way. The types of services offered are split in + * two. + * + * 1. Services component implementation has that it registers in the component + * such that application server can access this. Metrics and status + * reporting are two such services. + * 2. Services provided through application server, that the application + * server will inject into the component before it is opened for use. + * Clock, thread pool and memory management are examples of such services. + * + * The services offered with a short summary of what they provide are as + * follows: + * + * - Status reporters can register themselves as a reported in the component. + * A status server, for instance serving status information through a web + * server can thus fetch status pages wanted by clients and serve them. + * Status reporters thus don't need to know how status information is used. + * + * - A metric set can be registered, with a path for where in the application + * metric set it should exist. This way, the components do not have to know + * about metric management and the implementation of the metric manager. + * + * - A metric update hook can be registered. This will be called by the metric + * implementation at regular intervals or just before snapshotting/reporting. + * + * - A clock object is given. Using a common clock component all over the + * application makes us able to fake the clock in testing environments. + * Fetching current time is also a somewhat expensive operations we might + * do often, so having this common object to fetch it, we can easily + * optimize clock fetching as we see fit later. + * + * - A thread pool is given. This makes us able to use a thread pool. + * (Allthough currently we don't really need a thread pool, as threads + * typically live for the whole lifetime of the server. But currently we are + * forced to use a thread pool due to fastos.) Another feature of this is + * that the thread interface has built in information needed to detect + * deadlocks and report status about thread behavior, such that deadlock + * detecting and thread status can be shown without the threads themselves + * depending on how this is done. + * + * - A memory manager may also be provided, allowing components to request + * memory from a global pool, in order to let the application prioritize + * where to use memory. Again, this removes the dependency of how it is + * actually implemented to the components using it. + * + * Currently it is assumed that components are set up at application + * initialization time, and that they live as long as the application. Thus no + * unregister functionality is provided. Services that use registered status + * reporters or metric sets will shut down before the component is deleted, + * such that the component can be safely deleted without any unregistering + * needed. + */ +#pragma once + +#include <vespa/storageframework/generic/component/managedcomponent.h> +#include <vespa/storageframework/generic/thread/runnable.h> +#include <vespa/storageframework/generic/thread/thread.h> +#include <vespa/storageframework/generic/clock/clock.h> +#include <vespa/metrics/metricmanager.h> +#include <atomic> + +namespace storage { +namespace framework { + +class ComponentRegister; + +struct ComponentStateListener { + virtual ~ComponentStateListener() {} + + virtual void onOpen() {} + virtual void onClose() {} +}; + +class Component : private ManagedComponent +{ + ComponentRegister* _componentRegister; + vespalib::string _name; + const StatusReporter* _status; + metrics::Metric* _metric; + ThreadPool* _threadPool; + MetricRegistrator* _metricReg; + std::pair<MetricUpdateHook*, SecondTime> _metricUpdateHook; + MemoryManagerInterface* _memoryManager; + Clock* _clock; + ComponentStateListener* _listener; + std::atomic<UpgradeFlags> _upgradeFlag; + + UpgradeFlags loadUpgradeFlag() const { + return _upgradeFlag.load(std::memory_order_relaxed); + } + + // ManagedComponent implementation + metrics::Metric* getMetric() { return _metric; } + std::pair<MetricUpdateHook*, SecondTime> getMetricUpdateHook() + { return _metricUpdateHook; } + const StatusReporter* getStatusReporter() { return _status; } + void setMetricRegistrator(MetricRegistrator& mr); + void setMemoryManager(MemoryManagerInterface& mm) { _memoryManager = &mm; } + void setClock(Clock& c) { _clock = &c; } + void setThreadPool(ThreadPool& tp) { _threadPool = &tp; } + void setUpgradeFlag(UpgradeFlags flag) { + assert(_upgradeFlag.is_lock_free()); + _upgradeFlag.store(flag, std::memory_order_relaxed); + } + void open(); + void close(); + +public: + typedef std::unique_ptr<Component> UP; + + Component(ComponentRegister&, vespalib::stringref name); + virtual ~Component(); + + /** + * Register a component state listener, getting callbacks when components + * are started and stopped. An application might want to create all + * components before starting to do it's work. And it might stop doing work + * before starting to remove components. Using this listener, components + * may get callbacks in order to do some initialization after all components + * are set up, and to do some cleanups before other components are being + * removed. + */ + void registerComponentStateListener(ComponentStateListener&); + /** + * Register a status page, which might be visible to others through a + * component showing status of components. Only one status page can be + * registered per component. Use URI parameters in order to distinguish + * multiple pages. + */ + void registerStatusPage(const StatusReporter&); + + /** + * Register a metric (typically a metric set) used by this component. Only + * one metric set can be registered per component. Register a metric set in + * order to register many metrics within the component. + */ + void registerMetric(metrics::Metric&); + + /** + * Register a metric update hook. Only one per component. Note that the + * update hook will only be called if there actually is a metric mananger + * component registered in the application. + */ + void registerMetricUpdateHook(MetricUpdateHook&, SecondTime period); + + /** + * If you need to modify the metric sets that have been registered, you need + * to hold the metric manager lock while you do it. + */ + metrics::MetricLockGuard getMetricManagerLock(); + + /** Get the name of the component. Must be a unique name. */ + const vespalib::string& getName() const { return _name; } + + /** + * Get the thread pool for this application. Note that this call will fail + * before the application has registered a threadpool. Applications are + * encouraged to register a threadpool before adding components to avoid + * needing components to wait before accessing threadpool. + */ + ThreadPool& getThreadPool() const; + + /** + * Get the memory manager used in this application if any. This function + * will fail before the application has registered a memory manager + * implementation. Applications using memory management are encouraged to + * register a memory manager before creating components to avoid needing + * components to delay using it. + */ + MemoryManagerInterface& getMemoryManager() const; + + /** + * Get the clock used in this application. This function will fail before + * the application has registered a clock implementation. Applications are + * encourated to register a clock implementation before adding components to + * avoid needing components to delay using it. + */ + Clock& getClock() const; + + /** + * Helper functions for components wanting to start a single thread. + * If max wait time is not set, we assume process time includes waiting. + * If max process time is not set, deadlock detector cannot detect deadlocks + * in this thread. (Thus one is not required to call registerTick()) + */ + Thread::UP startThread(Runnable&, + MilliSecTime maxProcessTime = MilliSecTime(0), + MilliSecTime waitTime = MilliSecTime(0), + int ticksBeforeWait = 1); + + // Check upgrade flag settings. Note that this flag may change at any time. + // Thus the results of these functions should not be cached. + bool isUpgradingToMajorVersion() const + { return (loadUpgradeFlag() == UPGRADING_TO_MAJOR_VERSION); } + bool isUpgradingToMinorVersion() const + { return (loadUpgradeFlag() == UPGRADING_TO_MINOR_VERSION); } + bool isUpgradingFromMajorVersion() const + { return (loadUpgradeFlag() == UPGRADING_FROM_MAJOR_VERSION); } + bool isUpgradingFromMinorVersion() const + { return (loadUpgradeFlag() == UPGRADING_FROM_MINOR_VERSION); } + + void requestShutdown(vespalib::stringref reason); + +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/component/componentregister.h b/storageframework/src/vespa/storageframework/generic/component/componentregister.h new file mode 100644 index 00000000000..d43f9faf4a0 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/component/componentregister.h @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::ComponentRegister + * \ingroup component + * + * \brief Application server implements this to get overview of all components. + * + * By implementing this class, the application server will get all the + * components it needs to manage using an interface containing just what it + * needs to minimize dependencies. + */ +#pragma once + +namespace storage { +namespace framework { + +class ManagedComponent; + +struct ComponentRegister { + virtual ~ComponentRegister() {} + + virtual void registerComponent(ManagedComponent&) = 0; + virtual void requestShutdown(vespalib::stringref reason) = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/component/managedcomponent.h b/storageframework/src/vespa/storageframework/generic/component/managedcomponent.h new file mode 100644 index 00000000000..6934a9fab32 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/component/managedcomponent.h @@ -0,0 +1,80 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::ManagedComponent + * \ingroup component + * + * \brief Interface to expose to manager of components. + * + * As to not make the functions needed by the component manager exposed to the + * component implementation, and vice versa, this interface exist to be what + * the manager is interested in. That way, component implementation can + * implement that privately, but expose it to the component register. + */ +#pragma once + +#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/vespalib/stllike/string.h> + +namespace metrics { + class Metric; +} + +namespace storage { +namespace framework { + +class StatusReporter; +class MemoryManagerInterface; +class MetricRegistrator; +class MetricUpdateHook; +class ThreadPool; +class Clock; + +/** + * The upgrade flags can be used to add forward/backward compatability. In most + * cases, we can hopefully ignore this as next version is compatible. In some + * cases the new version might need to avoid doing requests the old version + * can't handle. In rare cases, the older version might have gotten some forward + * compatability code added which it might need to activate during an upgrade. + * + * Note that these flags must be set in an application when an upgrade requiring + * this is being performed. Upgrade docs should specify this if needed. + */ +enum UpgradeFlags { + // Indicates we're either not upgrading, or we're upgrading compatible + // versions so we doesn't need any special handling. + NO_UPGRADE_SPECIAL_HANDLING_ACTIVE, + // The cluster is being upgraded to this major version. We might need to + // send old type of messages to make older nodes understand what we send + UPGRADING_TO_MAJOR_VERSION, + // The cluster is being upgraded to this minor version. We might need to + // send old type of messages to make older nodes understand what we send + UPGRADING_TO_MINOR_VERSION, + // The cluster is being upgraded to the next major version. We might + // need to refrain from using functionality removed in the new version. + UPGRADING_FROM_MAJOR_VERSION, + // The cluster is being upgraded to the next minor version. We might + // need to refrain from using functionality removed in the new version. + UPGRADING_FROM_MINOR_VERSION +}; + +struct ManagedComponent { + virtual ~ManagedComponent() {} + + virtual const vespalib::string& getName() const = 0; + virtual metrics::Metric* getMetric() = 0; + virtual std::pair<MetricUpdateHook*, SecondTime> getMetricUpdateHook() = 0; + virtual const StatusReporter* getStatusReporter() = 0; + + virtual void setMetricRegistrator(MetricRegistrator&) = 0; + virtual void setMemoryManager(MemoryManagerInterface&) = 0; + virtual void setClock(Clock&) = 0; + virtual void setThreadPool(ThreadPool&) = 0; + virtual void setUpgradeFlag(UpgradeFlags flag) = 0; + virtual void open() = 0; + virtual void close() = 0; + +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/memory/.gitignore b/storageframework/src/vespa/storageframework/generic/memory/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/memory/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/generic/memory/CMakeLists.txt b/storageframework/src/vespa/storageframework/generic/memory/CMakeLists.txt new file mode 100644 index 00000000000..8be02926511 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/memory/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_memory OBJECT + SOURCES + memorytoken.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/generic/memory/memoryallocationtype.h b/storageframework/src/vespa/storageframework/generic/memory/memoryallocationtype.h new file mode 100644 index 00000000000..13ac09604bf --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/memory/memoryallocationtype.h @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::MemoryAllocationType + * \ingroup memory + * + * \brief Allocation types used to differ between memory manager clients. + * + * The different memory manager clients have different properties. It is + * important for the memory manager to distinguish between different users in + * order to know how to prioritize memory, and also in order to create good + * reports on memory usage. + * + * An allocation type holds metadata for a memory manager client, including a + * name for the type and various properties that may affect how much memory + * such a client will get, whether it always gets some, etc. + */ + +#pragma once + +#include <string> +#include <vespa/vespalib/util/linkedptr.h> + +namespace storage { +namespace framework { + +struct MemoryAllocationType { + typedef vespalib::LinkedPtr<MemoryAllocationType> LP; + + enum Flags { + NONE = 0x00, + FORCE_ALLOCATE = 0x01, + EXTERNAL_LOAD = 0x02, + CACHE = 0x04 + }; + + MemoryAllocationType() + : _flags(NONE), _name("") {}; + + MemoryAllocationType(const std::string& name, uint32_t flags = NONE) + : _flags(flags), _name(name) {} + + const std::string& getName() const { return _name; } + bool isAllocationsForced() const { return (_flags & FORCE_ALLOCATE); } + bool isExternalLoad() const { return (_flags & EXTERNAL_LOAD); } + bool isCache() const { return (_flags & CACHE); } + +private: + uint32_t _flags; + std::string _name; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/memory/memorymanagerinterface.h b/storageframework/src/vespa/storageframework/generic/memory/memorymanagerinterface.h new file mode 100644 index 00000000000..623b9e5ab1f --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/memory/memorymanagerinterface.h @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::MemoryManagerInterface + * \ingroup memory + * + * \brief Interface with functions clients need in order to use a memory manager + * + * This interface exist so clients can use a memory manager without actually + * depending on the implementation of it. + */ + +#pragma once + +#include <vespa/storageframework/generic/memory/memoryallocationtype.h> +#include <vespa/storageframework/generic/memory/memorytoken.h> +#include <vespa/storageframework/generic/memory/reducememoryusageinterface.h> + +namespace storage { +namespace framework { + +struct MemoryManagerInterface +{ + typedef std::unique_ptr<MemoryManagerInterface> UP; + + virtual ~MemoryManagerInterface() {} + + virtual void setMaximumMemoryUsage(uint64_t max) = 0; + + /** + * Registers the given allocation type by copying it, and returning + * a reference to the copied object. + */ + virtual const MemoryAllocationType& + registerAllocationType(const MemoryAllocationType& type) = 0; + + /** Throws exception if failing to find type. */ + virtual const MemoryAllocationType& + getAllocationType(const std::string& name) const = 0; + + /** Get an overview of all registration types. */ + virtual std::vector<const MemoryAllocationType*> + getAllocationTypes() const = 0; + + /** + * Decide how much to allocate for this request. Should be between min + * and max, unless it's of a type that can be denied (such as external + * requests), in which case we can also deny allocation by returning a null + * token. + */ + virtual MemoryToken::UP allocate( + const MemoryAllocationType&, + uint64_t min, + uint64_t max, + uint8_t priority, + ReduceMemoryUsageInterface* = 0) = 0; + + /** + * Utility function to see how much memory is available. + */ + virtual uint64_t getMemorySizeFreeForPriority(uint8_t priority) const = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/memory/memorytoken.cpp b/storageframework/src/vespa/storageframework/generic/memory/memorytoken.cpp new file mode 100644 index 00000000000..d2e9376c51d --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/memory/memorytoken.cpp @@ -0,0 +1,14 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/memory/memorytoken.h> + +namespace storage { +namespace framework { + +MemoryToken::~MemoryToken() +{ +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/memory/memorytoken.h b/storageframework/src/vespa/storageframework/generic/memory/memorytoken.h new file mode 100644 index 00000000000..ef77d802877 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/memory/memorytoken.h @@ -0,0 +1,35 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::MemoryToken + * \ingroup memory + * + * \brief Token to keep by client for current allocations. + * + * This class is a token a memory manager client will get from the memory + * manager when getting memory. It can be used to know how much you currently + * have allocated, and through it you can request more or less memory. + */ + +#pragma once + +#include <vespa/fastos/fastos.h> +#include <memory> +#include <vespa/vespalib/util/linkedptr.h> + +namespace storage { +namespace framework { + +class MemoryToken { +protected: +public: + typedef std::unique_ptr<MemoryToken> UP; + typedef vespalib::LinkedPtr<MemoryToken> LP; + virtual ~MemoryToken(); + + virtual uint64_t getSize() const = 0; + virtual bool resize(uint64_t min, uint64_t max) = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/memory/reducememoryusageinterface.h b/storageframework/src/vespa/storageframework/generic/memory/reducememoryusageinterface.h new file mode 100644 index 00000000000..5baaeb2c700 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/memory/reducememoryusageinterface.h @@ -0,0 +1,45 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::ReduceMemoryUsageInterface + * \ingroup memory + * + * \brief The manager can take memory back when needed using this interface. + * + * Some memory users, typically caches, wants to use all available memory. But + * to let them use all available memory, it must also be easy to take memory + * back when needed for something else. An implementation of this interface can + * be given on memory allocations to give the memory manager the ability to take + * memory back when needed. + */ + +#pragma once + +namespace storage { +namespace framework { + +struct ReduceMemoryUsageInterface +{ + virtual ~ReduceMemoryUsageInterface() {} + + /** + * This callback is called when the memory manager want to reduce the usage + * of the given memory token. Actual memory to be released should be + * released in this function. The token itself will be adjusted by the + * memory manager though. The memory manager may keep a lock through this + * call, so no memory manager calls should be made inside this callback. + * + * It is recommended that you actually release at least as many bytes as + * requested. Though currently it is allowed to reduce less or refuse, but + * this might mean that some higher priority task does not get the memory it + * needs. + * + * @param reduceBy Always in the range 0 < reduceBy <= token.size() + * @return The amount of memory no longer used. + */ + virtual uint64_t reduceMemoryConsumption(const MemoryToken&, + uint64_t reduceBy) = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/metric/.gitignore b/storageframework/src/vespa/storageframework/generic/metric/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/metric/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/generic/metric/CMakeLists.txt b/storageframework/src/vespa/storageframework/generic/metric/CMakeLists.txt new file mode 100644 index 00000000000..3e4d2d57055 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/metric/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_metric INTERFACE + SOURCES + INSTALL lib64 + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h new file mode 100644 index 00000000000..d300a4508af --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/metric/metricregistrator.h @@ -0,0 +1,31 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::MetricRegistrator + * \ingroup metric + * + * \brief Interface used to register a metric in the backend. + * + * To avoid needing the framework module to depend on the metric system (in + * case any users don't use it), this class exist to remove this dependency. + */ +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <vespa/metrics/metricmanager.h> + +namespace storage { +namespace framework { + +struct MetricRegistrator { + virtual ~MetricRegistrator() {} + + virtual void registerMetric(metrics::Metric&) = 0; + virtual void registerUpdateHook(vespalib::stringref name, + MetricUpdateHook& hook, + SecondTime period) = 0; + virtual metrics::MetricLockGuard getMetricManagerLock() = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/metric/metricupdatehook.h b/storageframework/src/vespa/storageframework/generic/metric/metricupdatehook.h new file mode 100644 index 00000000000..741e0ade787 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/metric/metricupdatehook.h @@ -0,0 +1,25 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::MetricUpdateHook + * \ingroup metric + * + * \brief Implement to get callbacks to update metrics periodically or just before reports/snapshots. + */ +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <vespa/metrics/metricmanager.h> + +namespace storage { +namespace framework { + +struct MetricUpdateHook { + typedef metrics::MetricManager::UpdateHook::MetricLockGuard MetricLockGuard; + virtual ~MetricUpdateHook() {} + + virtual void updateMetrics(const MetricLockGuard &) = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/status/.gitignore b/storageframework/src/vespa/storageframework/generic/status/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/generic/status/CMakeLists.txt b/storageframework/src/vespa/storageframework/generic/status/CMakeLists.txt new file mode 100644 index 00000000000..e2b79e1dea4 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_status OBJECT + SOURCES + statusreporter.cpp + htmlstatusreporter.cpp + xmlstatusreporter.cpp + httpurlpath.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp b/storageframework/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp new file mode 100644 index 00000000000..25ee0315dff --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp @@ -0,0 +1,57 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/status/htmlstatusreporter.h> + +namespace storage { +namespace framework { + +HtmlStatusReporter::HtmlStatusReporter(vespalib::stringref id, + vespalib::stringref name) + : StatusReporter(id, name) +{ +} + +HtmlStatusReporter::~HtmlStatusReporter() +{ +} + +void +HtmlStatusReporter::reportHtmlHeader(std::ostream& out, + const HttpUrlPath& path) const +{ + out << "<html>\n" + << "<head>\n" + << " <title>" << getName() << "</title>\n"; + reportHtmlHeaderAdditions(out, path); + out << "</head>\n" + << "<body>\n" + << " <h1>" << getName() << "</h1>\n"; +} + +void +HtmlStatusReporter::reportHtmlFooter(std::ostream& out, + const HttpUrlPath&) const +{ + out << "</body>\n</html>\n"; +} + +vespalib::string +HtmlStatusReporter::getReportContentType(const HttpUrlPath&) const +{ + return "text/html"; +} + +bool +HtmlStatusReporter::reportStatus(std::ostream& out, + const HttpUrlPath& path) const +{ + if (!isValidStatusRequest()) return false; + reportHtmlHeader(out, path); + reportHtmlStatus(out, path); + reportHtmlFooter(out, path); + return true; +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/status/htmlstatusreporter.h b/storageframework/src/vespa/storageframework/generic/status/htmlstatusreporter.h new file mode 100644 index 00000000000..c76ec879121 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/htmlstatusreporter.h @@ -0,0 +1,67 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::HtmlStatusReporter + * \ingroup component + * + * \brief Specialization of StatusReporter for reporters of HTML data. + * + * To avoid code duplication, and to let all HTML status reporters be able + * to look consistently, this specialization exist to have a common place to + * implement common HTML parts printed. + * + * Note: If you want to write HTTP from a status reporter that can also write + * other types of content, best practise is to instantiate the + * PartlyHtmlStatusReporter to print the HTML headers and footers. + */ + +#pragma once + +#include <vespa/storageframework/generic/status/statusreporter.h> + +namespace storage { +namespace framework { + +struct HtmlStatusReporter : public StatusReporter { + HtmlStatusReporter(vespalib::stringref id, vespalib::stringref name); + virtual ~HtmlStatusReporter(); + + /** + * The default HTML header writer uses this function to allow page to add + * some code in the <head></head> part of the HTML, such as javascript + * functions. + */ + virtual void reportHtmlHeaderAdditions(std::ostream&, + const HttpUrlPath&) const {} + + /** + * Write a default HTML header. It writes the start of an HTML + * file, including a body statement and a header with component name. + */ + virtual void reportHtmlHeader(std::ostream&, const HttpUrlPath&) const; + + /** Overwrite to write the actual HTML content. */ + virtual void reportHtmlStatus(std::ostream&, const HttpUrlPath&) const = 0; + + /** Writes a default HTML footer. Includes closing the body tag. */ + virtual void reportHtmlFooter(std::ostream&, const HttpUrlPath&) const; + + // Implementation of StatusReporter interface + virtual vespalib::string getReportContentType(const HttpUrlPath&) const; + virtual bool reportStatus(std::ostream&, const HttpUrlPath&) const; +}; + +/** + * This class can be used if your status reporter only reports HTML in some + * instances. Then you can create an instance of this class in order to write + * the HTML headers and footers when needed. + */ +struct PartlyHtmlStatusReporter : public HtmlStatusReporter { + PartlyHtmlStatusReporter(const StatusReporter& main) + : HtmlStatusReporter(main.getId(), main.getName()) {} + + virtual void reportHtmlStatus(std::ostream&, const HttpUrlPath&) const {} +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/status/httpurlpath.cpp b/storageframework/src/vespa/storageframework/generic/status/httpurlpath.cpp new file mode 100644 index 00000000000..6baa9a81ebe --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/httpurlpath.cpp @@ -0,0 +1,73 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/status/httpurlpath.h> +#include <stdint.h> + +namespace storage { +namespace framework { + +HttpUrlPath::HttpUrlPath(const vespalib::string& urlpath) + : _urlPath(urlpath), + _path(), + _attributes(), + _serverSpec() +{ + init(); +} + +HttpUrlPath::HttpUrlPath(const vespalib::string& urlpath, + const vespalib::string& serverSpec) + : _urlPath(urlpath), + _path(), + _attributes(), + _serverSpec(serverSpec) +{ + init(); +} + +void +HttpUrlPath::init() +{ + vespalib::string::size_type pos = _urlPath.find('?'); + if (pos == vespalib::string::npos) { + _path = _urlPath; + } else { + _path = _urlPath.substr(0, pos); + vespalib::string sub(_urlPath.substr(pos+1)); + vespalib::StringTokenizer tokenizer(sub, "&", ""); + for (uint32_t i=0, n=tokenizer.size(); i<n; ++i) { + const vespalib::string& s(tokenizer[i]); + pos = s.find('='); + if (pos == vespalib::string::npos) { + _attributes[s] = ""; + } else { + _attributes[s.substr(0,pos)] = s.substr(pos+1); + } + } + } +} + +bool +HttpUrlPath::hasAttribute(const vespalib::string& id) const +{ + return (_attributes.find(id) != _attributes.end()); +} + +const vespalib::string& +HttpUrlPath::getAttribute(const vespalib::string& id, + const vespalib::string& defaultValue) const +{ + std::map<vespalib::string, vespalib::string>::const_iterator it + = _attributes.find(id); + return (it == _attributes.end() ? defaultValue : it->second); +} + +void +HttpUrlPath::print(std::ostream& out, bool, const std::string&) const +{ + out << _urlPath; +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/status/httpurlpath.h b/storageframework/src/vespa/storageframework/generic/status/httpurlpath.h new file mode 100644 index 00000000000..a535edb5180 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/httpurlpath.h @@ -0,0 +1,63 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Utility class to parse the url-path part of an HTTP URL. + * Used by status module. + */ + +#pragma once + +#include <vespa/vespalib/util/printable.h> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/text/stringtokenizer.h> +#include <map> +#include <string> + +namespace storage { +namespace framework { + +class HttpUrlPath : public vespalib::Printable { + vespalib::string _urlPath; + vespalib::string _path; + std::map<vespalib::string, vespalib::string> _attributes; + vespalib::string _serverSpec; // "host:port" + + void init(); + +public: + HttpUrlPath(const vespalib::string& urlpath); + HttpUrlPath(const vespalib::string& urlpath, const vespalib::string& serverSpec); + + const vespalib::string& getPath() const { return _path; } + const std::map<vespalib::string, vespalib::string>& getAttributes() const + { return _attributes; } + + bool hasAttribute(const vespalib::string& id) const; + const vespalib::string& getAttribute(const vespalib::string& id, + const vespalib::string& defaultValue = "") const; + + const vespalib::string& getServerSpec() const { + return _serverSpec; + } + + template<typename T> + T get(const vespalib::string& id, const T& defaultValue = T()) const; + + virtual void print(std::ostream& out, bool verbose, + const std::string& indent) const; +}; + +template<typename T> +T HttpUrlPath::get(const vespalib::string& id, const T& defaultValue) const +{ + std::map<vespalib::string, vespalib::string>::const_iterator it + = _attributes.find(id); + if (it == _attributes.end()) return defaultValue; + T val; + std::istringstream ist(it->second); + ist >> val; + return val; +} + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/status/statusreporter.cpp b/storageframework/src/vespa/storageframework/generic/status/statusreporter.cpp new file mode 100644 index 00000000000..a5cc9b8e7ee --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/statusreporter.cpp @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/status/statusreporter.h> + +namespace storage { +namespace framework { + +StatusReporter::StatusReporter(vespalib::stringref id, vespalib::stringref name) + : _id(id), + _name(name) +{ +} + +StatusReporter::~StatusReporter() +{ +} + +bool +StatusReporter::reportHttpHeader(std::ostream& out, + const HttpUrlPath& path) const +{ + vespalib::string contentType(getReportContentType(path)); + if (contentType == "") return false; + out << "HTTP/1.1 200 OK\r\n" + "Connection: Close\r\n" + "Content-type: " << contentType << "\r\n\r\n"; + return true; +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/status/statusreporter.h b/storageframework/src/vespa/storageframework/generic/status/statusreporter.h new file mode 100644 index 00000000000..0d9aa3a8e4a --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/statusreporter.h @@ -0,0 +1,69 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::StatusReporter + * \ingroup component + * + * \brief Interface to implement for status reporters. + * + * Components that wants to make status pages available can implement this + * interface in order to provide status information without depending on how + * this information is server. Status data is typically available through an + * HTTP server running in the process. + * + * Specializations of this interface exists for HTML and XML outputters. + */ +#pragma once + +#include <ostream> +#include <vespa/storageframework/generic/status/httpurlpath.h> +#include <vespa/vespalib/stllike/string.h> + +namespace storage { +namespace framework { + +struct StatusReporter +{ + StatusReporter(vespalib::stringref id, vespalib::stringref name); + virtual ~StatusReporter(); + + /** + * Get the identifier. The identifier is a string matching regex + * ^[A-Za-z0-9_]+$. It is used to identify the status page in contexts where + * special characters are not wanted, such as in an URL. + */ + const vespalib::string& getId() const { return _id; } + /** + * Get the descriptive name of the status reported. This string should be + * able to contain anything. + */ + const vespalib::string& getName() const { return _name; } + + virtual bool isValidStatusRequest() const { return true; } + + /** + * Overwrite to get full control of header writes. + * Returning false means the page specified by path does not exist. + */ + virtual bool reportHttpHeader(std::ostream&, const HttpUrlPath&) const; + /** + * Called by default writeHttpHeader implementation to get content type. + * An empty string indicates page not found. + */ + virtual vespalib::string getReportContentType(const HttpUrlPath&) const = 0; + + /** + * Called to get the actual content to return in the status request. + * @return False if no such page exist, in which case you should not have + * written to the output stream. + */ + virtual bool reportStatus(std::ostream&, const HttpUrlPath&) const = 0; + +private: + vespalib::string _id; + vespalib::string _name; + +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/status/statusreportermap.h b/storageframework/src/vespa/storageframework/generic/status/statusreportermap.h new file mode 100644 index 00000000000..5d7c3383c31 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/statusreportermap.h @@ -0,0 +1,25 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::StatusReporterMap + * \ingroup status + * + * \brief Interface to access the various status reporters + */ +#pragma once + +#include <vespa/storageframework/generic/status/statusreporter.h> + +namespace storage { +namespace framework { + +struct StatusReporterMap { + virtual ~StatusReporterMap() {} + + virtual const StatusReporter* getStatusReporter(vespalib::stringref id) = 0; + + virtual std::vector<const StatusReporter*> getStatusReporters() = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/status/xmlstatusreporter.cpp b/storageframework/src/vespa/storageframework/generic/status/xmlstatusreporter.cpp new file mode 100644 index 00000000000..abb70ae6aca --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/xmlstatusreporter.cpp @@ -0,0 +1,61 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/status/xmlstatusreporter.h> + +namespace storage { +namespace framework { + +XmlStatusReporter::XmlStatusReporter(vespalib::stringref id, + vespalib::stringref name) + : StatusReporter(id, name) +{ +} + +XmlStatusReporter::~XmlStatusReporter() +{ +} + +void +XmlStatusReporter::initXmlReport(vespalib::XmlOutputStream& xos, + const HttpUrlPath&) const +{ + using namespace vespalib::xml; + xos << XmlTag("status") + << XmlAttribute("id", getId()) + << XmlAttribute("name", getName()); +} + +void +XmlStatusReporter::finalizeXmlReport(vespalib::XmlOutputStream& xos, + const HttpUrlPath&) const +{ + using namespace vespalib::xml; + xos << XmlEndTag(); + assert(xos.isFinalized()); +} + +vespalib::string +XmlStatusReporter::getReportContentType(const HttpUrlPath&) const +{ + return "application/xml"; +} + +bool +XmlStatusReporter::reportStatus(std::ostream& out, + const HttpUrlPath& path) const +{ + out << "<?xml version=\"1.0\"?>\n"; + vespalib::XmlOutputStream xos(out); + initXmlReport(xos, path); + vespalib::stringref failure = reportXmlStatus(xos, path); + if (!failure.empty()) { + using namespace vespalib::xml; + xos << XmlContent("Failed to report XML status: " + failure); + } + finalizeXmlReport(xos, path); + return failure.empty(); +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/status/xmlstatusreporter.h b/storageframework/src/vespa/storageframework/generic/status/xmlstatusreporter.h new file mode 100644 index 00000000000..0516989aaac --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/status/xmlstatusreporter.h @@ -0,0 +1,90 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::XmlStatusReporter + * \ingroup component + * + * \brief Specialization of StatusReporter for reporters of XML data. + * + * To make it easy to write legal XML and escape content that needs to be + * escaped, an XML writer is used to write the actual XML data. + * + * Note: If you want to write XML from a status reporter that can also write + * other types of content, best practise is to implement StatusReporter, and if + * serving XML in the reportStatus function, create a temporary + * XmlStatusReporter object, in order to reuse the report functions to init + * and finalize XML writing. + */ + +#pragma once + +#include <vespa/storageframework/generic/status/statusreporter.h> +#include <vespa/vespalib/util/xmlserializable.h> + +namespace vespalib { +} + +namespace storage { +namespace framework { + +struct XmlStatusReporter : public StatusReporter { + XmlStatusReporter(vespalib::stringref id, vespalib::stringref name); + virtual ~XmlStatusReporter(); + + virtual void initXmlReport(vespalib::xml::XmlOutputStream&, + const HttpUrlPath&) const; + + /** + * @return Empty string if ok, otherwise indicate a failure condition. + */ + virtual vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, + const HttpUrlPath&) const = 0; + + virtual void finalizeXmlReport(vespalib::xml::XmlOutputStream&, + const HttpUrlPath&) const; + + // Implementation of status reporter interface + virtual vespalib::string getReportContentType(const HttpUrlPath&) const; + virtual bool reportStatus(std::ostream&, const HttpUrlPath&) const; +}; + +/** + * If you're only reporting XML in some cases, you can use this instance to + * wrap the actual XML parts, so you can reuse the code that outputs the XML. + * Just use output operator in this class to add the actual XML. + */ +class PartlyXmlStatusReporter : public XmlStatusReporter { + vespalib::XmlOutputStream _xos; + const HttpUrlPath& _path; + +public: + PartlyXmlStatusReporter(const StatusReporter& main, std::ostream& out, + const HttpUrlPath& path) + : XmlStatusReporter(main.getId(), main.getName()), + _xos(out), + _path(path) + { + initXmlReport(_xos, path); + } + + ~PartlyXmlStatusReporter() { + finalizeXmlReport(_xos, _path); + } + + vespalib::XmlOutputStream& getStream() { return _xos; } + + virtual vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, + const HttpUrlPath&) const + { + return ""; + } + + template<typename T> + PartlyXmlStatusReporter& operator<<(const T& v) { + _xos << v; + return *this; + } +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/thread/.gitignore b/storageframework/src/vespa/storageframework/generic/thread/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/vespa/storageframework/generic/thread/CMakeLists.txt b/storageframework/src/vespa/storageframework/generic/thread/CMakeLists.txt new file mode 100644 index 00000000000..020bdb1f5b7 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_thread OBJECT + SOURCES + thread.cpp + threadpool.cpp + tickingthread.cpp + DEPENDS +) diff --git a/storageframework/src/vespa/storageframework/generic/thread/runnable.h b/storageframework/src/vespa/storageframework/generic/thread/runnable.h new file mode 100644 index 00000000000..0e6186b546a --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/runnable.h @@ -0,0 +1,68 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Runnable + * \ingroup thread + * + * \brief Minimal API for something that can be run. + * + * Minimum API to implement to be able to be run by a thread. + */ +#pragma once + +#include <vespa/storageframework/generic/clock/time.h> + +namespace storage { +namespace framework { + +/** + * A cycle type can be given when registering ticks. This is useful for + * monitoring, to see the difference between cycles that is just waiting and + * cycles that are processing. If this information is known, the monitoring + * tools can see that the longest process cycle have been 5 ms, even though + * the thread is waiting for 1000 ms when it is idle. + */ +enum CycleType { UNKNOWN_CYCLE, WAIT_CYCLE, PROCESS_CYCLE }; +const char* getCycleTypeName(CycleType); + +struct ThreadHandle { + virtual ~ThreadHandle() {} + + /** Check whether thread have been interrupted or not. */ + virtual bool interrupted() const = 0; + + /** + * Register a tick. Useful such that a deadlock detector can detect that + * threads are actually doing something. If cycle types are specified, + * deadlock detector can specifically know what thread has been doing and + * used appropriate max limit. On unknown cycles, less information is + * available, and deadlock detector will use sum of wait and process time. + * + * The cycle type specified is for the cycle that just passed. + * + * @param currentTime Callers can set current time such that backend does + * not need to calculate clock. (Too avoid additional + * clock fetches if client already knows current time) + */ + virtual void registerTick(CycleType = UNKNOWN_CYCLE, + MilliSecTime currentTime = MilliSecTime(0)) = 0; + + virtual uint64_t getWaitTime() const = 0; + + virtual uint64_t getMaxProcessTime() const = 0; + + /** + * The number of ticks done before wait is called when no more work is + * reported. + */ + virtual int getTicksBeforeWait() const = 0; +}; + +struct Runnable { + virtual ~Runnable() {} + + virtual void run(ThreadHandle&) = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/thread/taskthread.h b/storageframework/src/vespa/storageframework/generic/thread/taskthread.h new file mode 100644 index 00000000000..39456d0f56a --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/taskthread.h @@ -0,0 +1,71 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Implementation of ticking threads for performing prioritized tasks. + * Implements critical section and a prioritized queue for communication + * outside of thread. + * + * Note that doNonCriticalTick is not implemented to call a processTask() + * function, as applications might want to do something even if there is no + * task, prioritize something above a task at some time, or process multiple + * tasks in one tick (to reduce locking overhead).. Thus we expect most apps to + * want to implement doNonCriticalTick() anyhow, so rather we just make + * available functions for peeking and extracting tasks. + */ + +#pragma once + +#include <queue> +#include <vespa/storageframework/generic/thread/tickingthread.h> + +namespace storage { +namespace framework { + +template <typename Task> +class TaskThread : public TickingThread { + ThreadLock& _lock; + std::vector<Task> _enqueued; + std::priority_queue<Task> _tasks; + +public: + TaskThread(ThreadLock& lock); + + void addTask(const Task& t); + virtual ThreadWaitInfo doCriticalTick(ThreadIndex); + + bool empty() const { return _tasks.empty(); } + const Task& peek() const { return _tasks.top(); } + void pop() { _tasks.pop(); } + +private: + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) = 0; +}; + +template <typename Task> +TaskThread<Task>::TaskThread(ThreadLock& lock) + : _lock(lock) +{ +} + +template <typename Task> +void +TaskThread<Task>::addTask(const Task& t) +{ + TickingLockGuard lock(_lock.freezeCriticalTicks()); + _enqueued.push_back(t); + lock.broadcast(); +} + +template <typename Task> +ThreadWaitInfo +TaskThread<Task>::doCriticalTick(ThreadIndex) { + std::vector<Task> enqueued; + enqueued.swap(_enqueued); + for (size_t i=0, n=enqueued.size(); i<n; ++i) { + _tasks.push(enqueued[i]); + } + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; +} + + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp new file mode 100644 index 00000000000..d943cc66bb7 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/thread/thread.h> + +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { + +void +Thread::interruptAndJoin(vespalib::Monitor* m) +{ + interrupt(); + if (m != 0) { + vespalib::MonitorGuard monitorGuard(*m); + monitorGuard.broadcast(); + } + join(); +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h new file mode 100644 index 00000000000..e30e9b10404 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h @@ -0,0 +1,67 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Thread + * \ingroup thread + * + * \brief A wrapper for a thread class. + * + * This thread class exist to hide the actual implementation of threads used, + * and to give some extra information about the threads. This is in turned used + * by monitoring, to be able to see data about the threads running. One such + * monitoring tool is the deadlock detector. + */ +#pragma once + +#include <memory> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/storageframework/generic/thread/runnable.h> + +namespace vespalib { + class Monitor; +} + +namespace storage { +namespace framework { + +class Thread : public ThreadHandle { + vespalib::string _id; + +public: + typedef std::unique_ptr<Thread> UP; + + Thread(vespalib::stringref id) : _id(id) {} + virtual ~Thread() {} + + virtual const vespalib::string& getId() const { return _id; } + + /** Check whether thread have been interrupted or not. */ + virtual bool interrupted() const = 0; + /** Check whether thread have been joined or not. */ + virtual bool joined() const = 0; + + /** + * Call this function to set interrupt flag, such that later calls to + * interrupt returns true. If called on already interrupted thread it is a + * noop. + */ + virtual void interrupt() = 0; + /** + * Call this function to wait until thread has finished processing. If + * called after thread has already finished, it is a noop. + */ + virtual void join() = 0; + + virtual void updateParameters(uint64_t waitTime, + uint64_t maxProcessTime, + int ticksBeforeWait) = 0; + + /** + * Utility function to interrupt and join a thread, possibly broadcasting + * through a monitor after the signalling face. + */ + void interruptAndJoin(vespalib::Monitor* m); +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp new file mode 100644 index 00000000000..bea27961edc --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.cpp @@ -0,0 +1,43 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/thread/threadpool.h> + +namespace storage { +namespace framework { + +ThreadProperties::ThreadProperties(uint64_t waitTimeMs, + uint64_t maxProcessTimeMs, + int ticksBeforeWait) +{ + _waitTimeMs.store(waitTimeMs); + _maxProcessTimeMs.store(maxProcessTimeMs); + _ticksBeforeWait.store(ticksBeforeWait); +} + +uint64_t ThreadProperties::getMaxProcessTime() const { + return _maxProcessTimeMs.load(std::memory_order_relaxed); +} + +uint64_t ThreadProperties::getWaitTime() const { + return _waitTimeMs.load(std::memory_order_relaxed); +} + +int ThreadProperties::getTicksBeforeWait() const { + return _ticksBeforeWait.load(std::memory_order_relaxed); +} + +void ThreadProperties::setMaxProcessTime(uint64_t maxProcessingTimeMs) { + _maxProcessTimeMs.store(maxProcessingTimeMs); +} + +void ThreadProperties::setWaitTime(uint64_t waitTimeMs) { + _waitTimeMs.store(waitTimeMs); +} + +void ThreadProperties::setTicksBeforeWait(int ticksBeforeWait) { + _ticksBeforeWait.store(ticksBeforeWait); +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/thread/threadpool.h b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h new file mode 100644 index 00000000000..1cecd24bdc3 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/threadpool.h @@ -0,0 +1,99 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::ThreadPool + * \ingroup thread + * + * \brief A threadpool implementation usable by storage components. + * + * Using this threadpool interface, we can use a threadpool without depending + * on the actual implementation. Also, as information is provided of the + * threads, monitoring tools, like the deadlock detector can extract information + * about the threads. + */ +#pragma once + +#include <atomic> +#include <vespa/storageframework/generic/thread/runnable.h> +#include <vespa/storageframework/generic/thread/thread.h> +#include <vespa/storageframework/generic/clock/time.h> +#include <vector> + +namespace storage { +namespace framework { + +/** + * Each thread may have different properties, as to how long they wait between + * ticks and how long they're supposed to use processing between ticks. To be + * able to specify this per thread, a set of properties can be set by each + * thread. + */ +class ThreadProperties { + private: + /** + * Time this thread should maximum use to process before a tick is + * registered. (Including wait time if wait time is not set) + */ + std::atomic_uint_least64_t _maxProcessTimeMs; + /** + * Time this thread will wait in a non-interrupted wait cycle. + * Used in cases where a wait cycle is registered. As long as no other + * time consuming stuff is done in a wait cycle, you can just use the + * wait time here. The deadlock detector should add a configurable + * global time period before flagging deadlock anyways. + */ + std::atomic_uint_least64_t _waitTimeMs; + /** + * Number of ticks to be done before a wait. + */ + std::atomic_uint _ticksBeforeWait; + + public: + ThreadProperties(uint64_t waitTimeMs, + uint64_t maxProcessTimeMs, + int ticksBeforeWait); + + void setMaxProcessTime(uint64_t); + void setWaitTime(uint64_t); + void setTicksBeforeWait(int); + + uint64_t getMaxProcessTime() const; + uint64_t getWaitTime() const; + int getTicksBeforeWait() const; + + uint64_t getMaxCycleTime() const { + return std::max(_maxProcessTimeMs.load(std::memory_order_relaxed), + _waitTimeMs.load(std::memory_order_relaxed)); + } +}; + +/** Data kept on each thread due to the registerTick functinality. */ +struct ThreadTickData { + CycleType _lastTickType; + uint64_t _lastTickMs; + uint64_t _maxProcessingTimeSeenMs; + uint64_t _maxWaitTimeSeenMs; +}; + +/** Interface used to access data for the existing threads. */ +struct ThreadVisitor { + virtual ~ThreadVisitor() {} + virtual void visitThread(const vespalib::string& id, + const ThreadProperties&, + const ThreadTickData&) = 0; +}; + +struct ThreadPool { + virtual ~ThreadPool() {} + + virtual Thread::UP startThread(Runnable&, + vespalib::stringref id, + uint64_t waitTimeMs, + uint64_t maxProcessTime, + int ticksBeforeWait) = 0; + + virtual void visitThreads(ThreadVisitor&) const = 0; +}; + +} // framework +} // storage + diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp new file mode 100644 index 00000000000..2c41e05d72c --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.cpp @@ -0,0 +1,249 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/storageframework/generic/thread/tickingthread.h> +#include <atomic> +#include <vespa/log/log.h> +#include <sstream> +#include <vespa/storageframework/generic/thread/threadpool.h> +#include <vespa/vespalib/util/exceptions.h> + +LOG_SETUP(".framework.thread.ticker"); + +namespace storage { +namespace framework { + +ThreadWaitInfo ThreadWaitInfo::MORE_WORK_ENQUEUED(false); +ThreadWaitInfo ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN(true); + +void +ThreadWaitInfo::merge(const ThreadWaitInfo& other) { + if (!other._waitWanted) _waitWanted = false; +} + +/** + * \brief Implementation actually doing lock handling, waiting, and allowing a + * global synchronization point where no thread is currently running. + */ +class TickingThreadRunner : public Runnable { + vespalib::Monitor& _monitor; + TickingThread& _tickingThread; + uint32_t _threadIndex; + bool _wantToFreeze; + bool _frozen; + char _state; + +public: + typedef std::shared_ptr<TickingThreadRunner> SP; + + TickingThreadRunner(vespalib::Monitor& m, + TickingThread& ticker, + uint32_t threadIndex) + : _monitor(m), _tickingThread(ticker), + _threadIndex(threadIndex), _wantToFreeze(false), _frozen(false) {} + + /** + * Call to freeze this thread. Returns then the thread has done executing + * tick and has frozen. + */ + void freeze() { + vespalib::MonitorGuard guard(_monitor); + _wantToFreeze = true; + while (!_frozen) { + guard.wait(); + } + } + + /** + * Call to thaw up a frozen thread so it can continue. + */ + void thaw() { + vespalib::MonitorGuard guard(_monitor); + _wantToFreeze = false; + guard.broadcast(); + } + + char getState() const { return _state; } + +private: + virtual void run(ThreadHandle& handle) { + ThreadWaitInfo info = ThreadWaitInfo::MORE_WORK_ENQUEUED; + CycleType cycle = PROCESS_CYCLE; + int ticksExecutedAfterWait = 0; + while (!handle.interrupted()) { + { + vespalib::MonitorGuard guard(_monitor); + if (info.waitWanted()) { + _state = 'w'; + cycle = WAIT_CYCLE; + if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) { + guard.wait(handle.getWaitTime()); + ticksExecutedAfterWait = 0; + } + } + if (_wantToFreeze) { + _state = 'f'; + doFreeze(guard); + ticksExecutedAfterWait = 0; + } + _state = 'c'; + info.merge(_tickingThread.doCriticalTick(_threadIndex)); + _state = 'n'; + } + handle.registerTick(cycle); + ticksExecutedAfterWait++; + cycle = PROCESS_CYCLE; + info = _tickingThread.doNonCriticalTick(_threadIndex); + } + _state = 's'; + } + void doFreeze(vespalib::MonitorGuard& guard) { + _frozen = true; + guard.broadcast(); + while (_wantToFreeze) { + guard.wait(); + } + _frozen = false; + } +}; + +class TickingThreadPoolImpl : public TickingThreadPool { + vespalib::string _name; + vespalib::Monitor _monitor; + std::atomic_uint_least64_t _waitTime ; + std::atomic_uint _ticksBeforeWait; + std::atomic_uint_least64_t _maxProcessTime; + std::vector<TickingThreadRunner::SP> _tickers; + std::vector<std::shared_ptr<Thread> > _threads; + + struct FreezeGuard : public TickingLockGuard::Impl { + TickingThreadPoolImpl& _pool; + + FreezeGuard(TickingThreadPoolImpl& pool) + : _pool(pool) { _pool.freeze(); } + + virtual ~FreezeGuard() { _pool.thaw(); } + + virtual void broadcast() {} + }; + struct CriticalGuard : public TickingLockGuard::Impl { + vespalib::MonitorGuard _guard; + + CriticalGuard(vespalib::Monitor& m) : _guard(m) {} + + virtual void broadcast() { _guard.broadcast(); } + }; + +public: + TickingThreadPoolImpl(vespalib::stringref name, + MilliSecTime waitTime, + int ticksBeforeWait, + MilliSecTime maxProcessTime) + : _name(name), + _waitTime(waitTime.getTime()), + _ticksBeforeWait(ticksBeforeWait), + _maxProcessTime(maxProcessTime.getTime()) {} + + ~TickingThreadPoolImpl() { + stop(); + } + + virtual void updateParametersAllThreads( + MilliSecTime waitTime, + MilliSecTime maxProcessTime, + int ticksBeforeWait) { + _waitTime.store(waitTime.getTime()); + _maxProcessTime.store(maxProcessTime.getTime()); + _ticksBeforeWait.store(ticksBeforeWait); + // TODO: Add locking so threads not deleted while updating + for (uint32_t i=0; i<_threads.size(); ++i) { + _threads[i]->updateParameters(waitTime.getTime(), + maxProcessTime.getTime(), + ticksBeforeWait); + } + } + + void addThread(TickingThread& ticker) { + ThreadIndex index = _tickers.size(); + ticker.newThreadCreated(index); + _tickers.push_back(TickingThreadRunner::SP( + new TickingThreadRunner(_monitor, ticker, index))); + } + + void start(ThreadPool& pool) { + if (_tickers.empty()) { + throw vespalib::IllegalStateException( + "Makes no sense to start threadpool without threads", + VESPA_STRLOC); + } + for (uint32_t i=0; i<_tickers.size(); ++i) { + std::ostringstream ost; + ost << _name.c_str() << " thread " << i; + _threads.push_back(std::shared_ptr<Thread>(pool.startThread( + *_tickers[i], + ost.str(), + _waitTime.load(std::memory_order_relaxed), + _maxProcessTime.load(std::memory_order_relaxed), + _ticksBeforeWait.load(std::memory_order_relaxed)))); + } + } + + virtual TickingLockGuard freezeAllTicks() { + return TickingLockGuard(vespalib::LinkedPtr<TickingLockGuard::Impl>( + new FreezeGuard(*this))); + } + + virtual TickingLockGuard freezeCriticalTicks() { + return TickingLockGuard(vespalib::LinkedPtr<TickingLockGuard::Impl>( + new CriticalGuard(_monitor))); + } + + void stop() { + for (uint32_t i=0; i<_threads.size(); ++i) { + _threads[i]->interrupt(); + } + { + vespalib::MonitorGuard guard(_monitor); + guard.broadcast(); + } + for (uint32_t i=0; i<_threads.size(); ++i) { + _threads[i]->join(); + } + } + + vespalib::string getStatus() { + vespalib::string result(_tickers.size(), ' '); + for (uint32_t i=0, n=_tickers.size(); i<n; ++i) { + result[i] = _tickers[i]->getState(); + } + return result; + } + +private: + void freeze() { + for (uint32_t i=0; i<_tickers.size(); ++i) { + _tickers[i]->freeze(); + } + } + + void thaw() { + for (uint32_t i=0; i<_tickers.size(); ++i) { + _tickers[i]->thaw(); + } + } +}; + +TickingThreadPool::UP +TickingThreadPool::createDefault( + vespalib::stringref name, + MilliSecTime waitTime, + int ticksBeforeWait, + MilliSecTime maxProcessTime) +{ + return TickingThreadPool::UP(new TickingThreadPoolImpl( + name, + waitTime, + ticksBeforeWait, + maxProcessTime)); +} + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h new file mode 100644 index 00000000000..8357e50f401 --- /dev/null +++ b/storageframework/src/vespa/storageframework/generic/thread/tickingthread.h @@ -0,0 +1,107 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * This file contains a utility function to handle threads doing a lot of + * single ticks. It brings the following functionality: + * + * - Give application setting up the threads a way to synchronize all the + * threads so it can perform some operation while no thread is ticking. + * - Give multiple threads a way to use common lock for critical region, such + * that you can divide responsible between multiple threads, and still have + * a way to notify and wait for all. + * - Automatically implement registration in deadlock handler, and updating + * tick times there. + * - Give a thread specific context to tick functions, such that one class + * instance can be used for all threads. + * - Hide thread functionality for starting, stopping and running. + * - Minimizes locking by using a single lock that is taken only once per + * tick loop. + */ +#pragma once + +#include <memory> +#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/util/linkedptr.h> +#include <vespa/vespalib/util/sync.h> + +namespace storage { +namespace framework { + +class ThreadPool; +typedef uint32_t ThreadIndex; + +/** + * \brief Information returned from tick functions to indicate whether thread + * should throttle a bit or not. + */ +class ThreadWaitInfo { + bool _waitWanted; + ThreadWaitInfo(bool waitBeforeNextTick) : _waitWanted(waitBeforeNextTick) {} + +public: + static ThreadWaitInfo MORE_WORK_ENQUEUED; + static ThreadWaitInfo NO_MORE_CRITICAL_WORK_KNOWN; + + void merge(const ThreadWaitInfo& other); + bool waitWanted() { return _waitWanted; } +}; + +/** + * \brief Simple superclass to implement for ticking threads. + */ +struct TickingThread { + virtual ~TickingThread() {} + + virtual ThreadWaitInfo doCriticalTick(ThreadIndex) = 0; + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) = 0; + virtual void newThreadCreated(ThreadIndex) {} +}; + +/** \brief Delete to allow threads to tick again. */ +struct TickingLockGuard { + struct Impl { + virtual ~Impl() {} + virtual void broadcast() = 0; + }; + TickingLockGuard(vespalib::LinkedPtr<Impl> impl) : _impl(impl) {} + void broadcast() { _impl->broadcast(); } +private: + vespalib::LinkedPtr<Impl> _impl; +}; + +struct ThreadLock { + virtual ~ThreadLock() { } + virtual TickingLockGuard freezeAllTicks() = 0; + virtual TickingLockGuard freezeCriticalTicks() = 0; +}; + +/** + * \brief Thread pool set up by the application to control the threads. + */ +struct TickingThreadPool : public ThreadLock { + typedef std::unique_ptr<TickingThreadPool> UP; + + static TickingThreadPool::UP createDefault( + vespalib::stringref name, + MilliSecTime waitTime = MilliSecTime(5), + int ticksBeforeWait = 1, + MilliSecTime maxProcessTime = SecondTime(5).getMillis()); + + virtual void updateParametersAllThreads( + MilliSecTime waitTime, + MilliSecTime maxProcessTime, + int ticksBeforeWait) = 0; + + + virtual ~TickingThreadPool() {} + + /** All threads must be added before starting the threads. */ + virtual void addThread(TickingThread& ticker) = 0; + /** Start all the threads added. */ + virtual void start(ThreadPool& pool) = 0; + virtual void stop() = 0; + virtual vespalib::string getStatus() = 0; +}; + +} // framework +} // storage diff --git a/storageframework/src/vespa/storageframework/storageframework.h b/storageframework/src/vespa/storageframework/storageframework.h new file mode 100644 index 00000000000..9ac5de9fafe --- /dev/null +++ b/storageframework/src/vespa/storageframework/storageframework.h @@ -0,0 +1,17 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \file storageframework.h + * + * This file includes the most common parts used by the framework. + */ + +#include <vespa/storageframework/generic/clock/clock.h> +#include <vespa/storageframework/generic/clock/timer.h> +#include <vespa/storageframework/generic/component/component.h> +#include <vespa/storageframework/generic/memory/memorymanagerinterface.h> +#include <vespa/storageframework/generic/metric/metricupdatehook.h> +#include <vespa/storageframework/generic/status/htmlstatusreporter.h> +#include <vespa/storageframework/generic/status/statusreportermap.h> +#include <vespa/storageframework/generic/status/xmlstatusreporter.h> +#include <vespa/storageframework/generic/thread/threadpool.h> + |