diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /storageframework/src/tests |
Publish
Diffstat (limited to 'storageframework/src/tests')
17 files changed, 1193 insertions, 0 deletions
diff --git a/storageframework/src/tests/.gitignore b/storageframework/src/tests/.gitignore new file mode 100644 index 00000000000..9124b7b5751 --- /dev/null +++ b/storageframework/src/tests/.gitignore @@ -0,0 +1,5 @@ +/.depend +/Makefile +/test.vlog +/testrunner +storageframework_testrunner_app diff --git a/storageframework/src/tests/CMakeLists.txt b/storageframework/src/tests/CMakeLists.txt new file mode 100644 index 00000000000..863d5381d20 --- /dev/null +++ b/storageframework/src/tests/CMakeLists.txt @@ -0,0 +1,12 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(storageframework_testrunner_app + SOURCES + testrunner.cpp + DEPENDS + storageframework_testclock + storageframework_teststatus + storageframework_testmemory + storageframework_testthread + storageframework +) +vespa_add_test(NAME storageframework_testrunner_app COMMAND storageframework_testrunner_app) diff --git a/storageframework/src/tests/clock/.gitignore b/storageframework/src/tests/clock/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/tests/clock/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/tests/clock/CMakeLists.txt b/storageframework/src/tests/clock/CMakeLists.txt new file mode 100644 index 00000000000..1134cf8e88a --- /dev/null +++ b/storageframework/src/tests/clock/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_testclock + SOURCES + timetest.cpp + DEPENDS +) diff --git a/storageframework/src/tests/clock/timetest.cpp b/storageframework/src/tests/clock/timetest.cpp new file mode 100644 index 00000000000..c7319667ea8 --- /dev/null +++ b/storageframework/src/tests/clock/timetest.cpp @@ -0,0 +1,85 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct TimeTest : public CppUnit::TestFixture +{ + void setUp() {} + void tearDown() {} + + void testBasics(); + void testCreatedFromClock(); + void canAssignMicrosecondResolutionTimeToFakeClock(); + + CPPUNIT_TEST_SUITE(TimeTest); + CPPUNIT_TEST(testBasics); // Fails sometimes, test needs rewrite. + CPPUNIT_TEST(testCreatedFromClock); + CPPUNIT_TEST(canAssignMicrosecondResolutionTimeToFakeClock); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(TimeTest); + +void +TimeTest::testBasics() +{ + SecondTime timeSec(1); + + MilliSecTime timeMillis = timeSec.getMillis(); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), timeMillis.getTime()); + CPPUNIT_ASSERT_EQUAL(timeSec, timeMillis.getSeconds()); + + MicroSecTime timeMicros = timeSec.getMicros(); + CPPUNIT_ASSERT_EQUAL(timeSec.getMicros(), timeMillis.getMicros()); + CPPUNIT_ASSERT_EQUAL(timeMillis, timeMicros.getMillis()); + CPPUNIT_ASSERT_EQUAL(timeSec, timeMicros.getSeconds()); + + MicroSecTime timeMicros2 = timeMicros; + CPPUNIT_ASSERT(timeMicros2 == timeMicros); + timeMicros2 += MicroSecTime(25000); + CPPUNIT_ASSERT(timeMicros2 > timeMicros); + CPPUNIT_ASSERT(timeMicros < timeMicros2); + timeMicros2 -= MicroSecTime(30000); + CPPUNIT_ASSERT(timeMicros2 < timeMicros); + CPPUNIT_ASSERT(timeMicros > timeMicros2); + timeMicros2 += MicroSecTime(55000); + + MilliSecTime timeMillis2 = timeMicros2.getMillis(); + CPPUNIT_ASSERT(timeMillis2 > timeMillis); + CPPUNIT_ASSERT_EQUAL(uint64_t(1050), timeMillis2.getTime()); + CPPUNIT_ASSERT_EQUAL(timeSec, timeMillis2.getSeconds()); +} + +void +TimeTest::testCreatedFromClock() +{ + defaultimplementation::FakeClock clock; + clock.setAbsoluteTimeInSeconds(600); + + CPPUNIT_ASSERT_EQUAL(SecondTime(600), SecondTime(clock)); + CPPUNIT_ASSERT_EQUAL(MilliSecTime(600 * 1000), MilliSecTime(clock)); + CPPUNIT_ASSERT_EQUAL(MicroSecTime(600 * 1000 * 1000), MicroSecTime(clock)); +} + +void +TimeTest::canAssignMicrosecondResolutionTimeToFakeClock() +{ + defaultimplementation::FakeClock clock; + clock.setAbsoluteTimeInMicroSeconds(1234567); // 1.234567 seconds + + // All non-microsec time points must necessarily be truncated. + CPPUNIT_ASSERT_EQUAL(SecondTime(1), SecondTime(clock)); + CPPUNIT_ASSERT_EQUAL(MilliSecTime(1234), MilliSecTime(clock)); + CPPUNIT_ASSERT_EQUAL(MicroSecTime(1234567), MicroSecTime(clock)); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/tests/memory/.gitignore b/storageframework/src/tests/memory/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/tests/memory/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/tests/memory/CMakeLists.txt b/storageframework/src/tests/memory/CMakeLists.txt new file mode 100644 index 00000000000..97a7314f4cb --- /dev/null +++ b/storageframework/src/tests/memory/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_testmemory + SOURCES + memorymanagertest.cpp + memorystatetest.cpp + DEPENDS +) diff --git a/storageframework/src/tests/memory/memorymanagertest.cpp b/storageframework/src/tests/memory/memorymanagertest.cpp new file mode 100644 index 00000000000..3675cc55c2d --- /dev/null +++ b/storageframework/src/tests/memory/memorymanagertest.cpp @@ -0,0 +1,400 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <vespa/storageframework/defaultimplementation/memory/memorymanager.h> +#include <vespa/storageframework/defaultimplementation/memory/simplememorylogic.h> +#include <vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/vespalib/util/document_runnable.h> +#include <vespa/vespalib/util/random.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct MemoryManagerTest : public CppUnit::TestFixture +{ + void testBasics(); + void testCacheAllocation(); + void testStress(); + + CPPUNIT_TEST_SUITE(MemoryManagerTest); + CPPUNIT_TEST(testBasics); + CPPUNIT_TEST(testCacheAllocation); + CPPUNIT_TEST(testStress); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(MemoryManagerTest); + +void +MemoryManagerTest::testBasics() +{ + uint64_t maxMemory = 1000; + RealClock clock; + SimpleMemoryLogic* logic = new SimpleMemoryLogic(clock, maxMemory); + AllocationLogic::UP allLogic(std::move(logic)); + MemoryManager manager(std::move(allLogic)); + + const MemoryAllocationType& putAlloc(manager.registerAllocationType( + MemoryAllocationType("put", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& getAlloc(manager.registerAllocationType( + MemoryAllocationType("get", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& bufAlloc(manager.registerAllocationType( + MemoryAllocationType("buffer"))); + const MemoryAllocationType& cacheAlloc(manager.registerAllocationType( + MemoryAllocationType("cache", MemoryAllocationType::CACHE))); + const MemoryState& state(logic->getState()); + const MemoryState::SnapShot& current(state.getCurrentSnapshot()); + // Basics + { + // * Getting a token, and release it back with correct behavior + framework::MemoryToken::UP put = manager.allocate(putAlloc, + 0, 100, 80); + CPPUNIT_ASSERT(put.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(100), put->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(100), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(900), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + + // * Do the same while not being empty. Different type. + framework::MemoryToken::UP get = manager.allocate(getAlloc, + 30, 200, 50); + CPPUNIT_ASSERT(get.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(200), get->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(300), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(700), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + + // * Do the same while not being empty. Same type. + framework::MemoryToken::UP get2 = manager.allocate( + getAlloc, + 70, + 150, + 60); + + CPPUNIT_ASSERT(get2.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(150), get2->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(450), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(550), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // Non-external load + // * Getting minimum when going beyond 80% full + { + framework::MemoryToken::UP filler = manager.allocate(putAlloc, + 795, 795, 90); + framework::MemoryToken::UP resize = manager.allocate( + bufAlloc, 10, 90, 80); + CPPUNIT_ASSERT(resize.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(10), resize->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(805), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(195), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // Non-external load + // * Getting up to threshold if hitting it + { + framework::MemoryToken::UP filler = manager.allocate(putAlloc, + 750, 750, 90); + framework::MemoryToken::UP resize = manager.allocate( + bufAlloc, 10, 90, 80); + CPPUNIT_ASSERT(resize.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(50), resize->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(800), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(200), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // External load + { + // * Stopped when going beyond 80% full + framework::MemoryToken::UP filler = manager.allocate(putAlloc, + 795, 795, 90); + framework::MemoryToken::UP put = manager.allocate(putAlloc, + 10, 100, 80); + CPPUNIT_ASSERT(put.get() == 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(795), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(205), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // External load + { + // * Getting up to threshold if hitting it + framework::MemoryToken::UP filler = manager.allocate(putAlloc, + 750, 750, 90); + framework::MemoryToken::UP put = manager.allocate(putAlloc, + 10, 100, 80); + CPPUNIT_ASSERT(put.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(50), put->getSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(800), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(200), state.getFreeSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(1000), state.getTotalSize()); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + + // Test caching.. + { + // Cache paradigm: + // Allocate a token taking up no space at all. + // Give it to your ReduceMemoryUsageInterface implementation. + // Run resize on your token in that implementation to get memory and + // return memory. That way locking should be easy when needed. + struct ReduceI : public framework::ReduceMemoryUsageInterface { + framework::MemoryToken::UP _token; + + virtual uint64_t reduceMemoryConsumption(const MemoryToken& token, + uint64_t reduceBy) + { + assert(&token == _token.get()); + (void) &token; + assert(_token->getSize() >= reduceBy); + return reduceBy; + } + }; + ReduceI reducer; + framework::MemoryToken::UP cache = manager.allocate(cacheAlloc, + 0, 0, 0, &reducer); + CPPUNIT_ASSERT(cache.get() != 0); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), cache->getSize()); + reducer._token = std::move(cache); + for (uint32_t i=1; i<=50; ++i) { + bool success = reducer._token->resize(i * 10, i * 10); + CPPUNIT_ASSERT_EQUAL(true, success); + } + CPPUNIT_ASSERT_EQUAL(uint64_t(500), reducer._token->getSize()); + + // * Ordered to free space + framework::MemoryToken::UP put = manager.allocate(putAlloc, + 600, 600, 80); + CPPUNIT_ASSERT_EQUAL_MSG(manager.toString(), + uint64_t(400), reducer._token->getSize()); + CPPUNIT_ASSERT_EQUAL_MSG(manager.toString(), + uint64_t(600), put->getSize()); + } + CPPUNIT_ASSERT_EQUAL_MSG(state.toString(true), + uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL_MSG(state.toString(true), + uint64_t(0), current.getUserCount()); + + // Test merge and tracking of allocation counts with merge, by doing + // operations with tokens and see that user count and used size + // correctly go back to zero. + { + framework::MemoryToken::UP tok1( + manager.allocate(putAlloc, 5, 5, 40)); + framework::MemoryToken::UP tok2( + manager.allocate(putAlloc, 10, 10, 40)); + framework::MemoryToken::UP tok3( + manager.allocate(putAlloc, 20, 20, 40)); + framework::MemoryToken::UP tok4( + manager.allocate(putAlloc, 40, 40, 40)); + framework::MemoryToken::UP tok5( + manager.allocate(putAlloc, 80, 80, 40)); + framework::MemoryToken::UP tok6( + manager.allocate(putAlloc, 1, 1, 40)); + framework::MemoryToken::UP tok7( + manager.allocate(putAlloc, 3, 3, 40)); + } +} + +void +MemoryManagerTest::testCacheAllocation() +{ + uint64_t maxMemory = 3000; + + RealClock clock; + SimpleMemoryLogic::UP logic(new PriorityMemoryLogic(clock, maxMemory)); + logic->setCacheThreshold(1.0); + + AllocationLogic::UP allLogic(std::move(logic)); + MemoryManager manager(std::move(allLogic)); + + const MemoryAllocationType& putAlloc(manager.registerAllocationType( + MemoryAllocationType("put", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& cacheAlloc(manager.registerAllocationType( + MemoryAllocationType("cache", MemoryAllocationType::CACHE))); + + framework::MemoryToken::UP token = + manager.allocate(putAlloc, + 50, + 50, + 127); + + CPPUNIT_ASSERT_EQUAL(50, (int)token->getSize()); + + framework::MemoryToken::UP token2 = + manager.allocate(cacheAlloc, + 1000, + 2000, + 127); + + CPPUNIT_ASSERT_EQUAL(2000, (int)token2->getSize()); + + token2->resize(2000, 3000); + + CPPUNIT_ASSERT_EQUAL(2950, (int)token2->getSize()); +} + +namespace { +struct MemoryManagerLoadGiver : public document::Runnable, + public ReduceMemoryUsageInterface +{ + MemoryManager& _manager; + const framework::MemoryAllocationType& _type; + uint8_t _priority; + uint32_t _minMem; + uint32_t _maxMem; + uint32_t _failed; + uint32_t _ok; + uint32_t _reduced; + typedef vespalib::LinkedPtr<MemoryToken> MemoryTokenPtr; + std::vector<MemoryTokenPtr> _tokens; + vespalib::Lock _cacheLock; + + MemoryManagerLoadGiver( + MemoryManager& manager, + const framework::MemoryAllocationType& type, + uint8_t priority, + uint32_t minMem, + uint32_t maxMem, + uint32_t tokensToKeep) + : _manager(manager), + _type(type), + _priority(priority), + _minMem(minMem), + _maxMem(maxMem), + _failed(0), + _ok(0), + _reduced(0), + _tokens(tokensToKeep) + { + } + + uint64_t reduceMemoryConsumption(const MemoryToken&, uint64_t reduceBy) { + ++_reduced; + return reduceBy; + } + + void run() { + ReduceMemoryUsageInterface* reducer = 0; + if (_type.isCache()) reducer = this; + vespalib::RandomGen randomizer; + while (running()) { + vespalib::Lock lock(_cacheLock); + framework::MemoryToken::UP token = _manager.allocate( + _type, _minMem, _maxMem, _priority, reducer); + if (token.get() == 0) { + ++_failed; + } else { + ++_ok; + } + uint32_t index = randomizer.nextUint32(0, _tokens.size() - 1); + _tokens[index] = MemoryTokenPtr(token.release()); + } + } +}; +} + +void +MemoryManagerTest::testStress() +{ + uint64_t stressTimeMS = 1 * 1000; + uint64_t maxMemory = 1 * 1024 * 1024; + RealClock clock; + AllocationLogic::UP logic(new PriorityMemoryLogic(clock, maxMemory)); + MemoryManager manager(std::move(logic)); + + FastOS_ThreadPool pool(128 * 1024); + std::vector<MemoryManagerLoadGiver*> loadGivers; + for (uint32_t type = 0; type < 5; ++type) { + const MemoryAllocationType* allocType = 0; + uint32_t min = 1000, max = 5000; + if (type == 0) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "default")); + } else if (type == 1) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "external", MemoryAllocationType::EXTERNAL_LOAD)); + } else if (type == 2) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "forced", MemoryAllocationType::FORCE_ALLOCATE)); + } else if (type == 3) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "forcedExternal", MemoryAllocationType::FORCE_ALLOCATE + | MemoryAllocationType::EXTERNAL_LOAD)); + } else if (type == 4) { + allocType = &manager.registerAllocationType(MemoryAllocationType( + "cache", MemoryAllocationType::CACHE)); + max = 30000; + } + for (int priority = 0; priority < 256; priority += 8) { + loadGivers.push_back(new MemoryManagerLoadGiver( + manager, *allocType, priority, min, max, 10)); + loadGivers.back()->start(pool); + } + FastOS_Thread::Sleep(stressTimeMS); + } + FastOS_Thread::Sleep(5 * stressTimeMS); + uint64_t okTotal = 0, failedTotal = 0, reducedTotal = 0; + for (uint32_t i = 0; i < loadGivers.size(); i++) { + /* + fprintf(stderr, "%d %s-%u: Failed %d, ok %d, reduced %d\n", + i, loadGivers[i]->_type.getName().c_str(), + uint32_t(loadGivers[i]->_priority), + loadGivers[i]->_failed, loadGivers[i]->_ok, + loadGivers[i]->_reduced); // */ + okTotal += loadGivers[i]->_ok; + failedTotal += loadGivers[i]->_failed; + reducedTotal += loadGivers[i]->_reduced; + } + for (uint32_t i = 0; i < loadGivers.size(); i++) loadGivers[i]->stop(); + for (uint32_t i = 0; i < loadGivers.size(); i++) loadGivers[i]->join(); + pool.Close(); + + /* + bool verbose = false; + std::cerr << "\n\nMemory allocations at end of load:\n"; + manager.print(std::cerr, verbose, ""); // */ + + for (uint32_t i = 0; i < loadGivers.size(); i++) { + loadGivers[i]->_tokens.clear(); + } + for (uint32_t i = 0; i < loadGivers.size(); i++) { + delete loadGivers[i]; + } + loadGivers.clear(); + + //std::cerr << "\n\nMemory allocations at end of testl:\n"; + //manager.print(std::cerr, verbose, ""); + + std::cerr << "\n Managed " << std::fixed + << (okTotal / (stressTimeMS / 1000)) + << " ok, " << (failedTotal / (stressTimeMS / 1000)) + << " failed and " << (reducedTotal / (stressTimeMS / 1000)) + << " reduced allocations/s.\n "; + + MemoryState state(clock, 1); + manager.getState(state); + const MemoryState::SnapShot& current(state.getCurrentSnapshot()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUserCount()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSize()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), current.getUsedSizeIgnoringCache()); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/tests/memory/memorystatetest.cpp b/storageframework/src/tests/memory/memorystatetest.cpp new file mode 100644 index 00000000000..9703d7d0133 --- /dev/null +++ b/storageframework/src/tests/memory/memorystatetest.cpp @@ -0,0 +1,176 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <vespa/storageframework/defaultimplementation/memory/memorystate.h> +#include <vespa/storageframework/generic/memory/memorymanagerinterface.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct MemoryStateTest : public CppUnit::TestFixture +{ + void setUp() {} + void tearDown() {} + + void testBasics(); + + CPPUNIT_TEST_SUITE(MemoryStateTest); + CPPUNIT_TEST(testBasics); // Fails sometimes, test needs rewrite. + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(MemoryStateTest); + +class SimpleMemoryManager : public framework::MemoryManagerInterface +{ +private: + std::map<std::string, framework::MemoryAllocationType> _types; + +public: + virtual void setMaximumMemoryUsage(uint64_t max) { (void) max; } + + virtual const framework::MemoryAllocationType& + registerAllocationType(const framework::MemoryAllocationType& type) { + _types[type.getName()] = type; + return _types[type.getName()]; + } + + virtual const framework::MemoryAllocationType& + getAllocationType(const std::string& name) const { + std::map<std::string, framework::MemoryAllocationType>::const_iterator iter = + _types.find(name); + + if (iter == _types.end()) { + throw vespalib::IllegalArgumentException("Allocation type not found: " + name); + } + + return iter->second; + } + + virtual std::vector<const MemoryAllocationType*> getAllocationTypes() const + { + std::vector<const MemoryAllocationType*> types; + for(std::map<std::string, framework::MemoryAllocationType> + ::const_iterator it = _types.begin(); it != _types.end(); ++it) + { + types.push_back(&it->second); + } + return types; + } + + framework::MemoryToken::UP allocate(const framework::MemoryAllocationType&, + uint64_t, + uint64_t, + uint8_t, + framework::ReduceMemoryUsageInterface*) + { + return framework::MemoryToken::UP(); + } + + uint64_t getMemorySizeFreeForPriority(uint8_t priority) const { + (void) priority; + return 0; + } +}; + +void +MemoryStateTest::testBasics() +{ + SimpleMemoryManager manager; + + const MemoryAllocationType& putAlloc(manager.registerAllocationType( + MemoryAllocationType("MESSAGE_PUT", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& getAlloc(manager.registerAllocationType( + MemoryAllocationType("MESSAGE_GET", MemoryAllocationType::EXTERNAL_LOAD))); + const MemoryAllocationType& blockAlloc(manager.registerAllocationType( + MemoryAllocationType("MESSAGE_DOCBLOCK"))); + const MemoryAllocationType& databaseAlloc(manager.registerAllocationType( + MemoryAllocationType("DATABASE"))); + const MemoryAllocationType& cacheAlloc(manager.registerAllocationType( + MemoryAllocationType("SLOTFILE_CACHE", MemoryAllocationType::CACHE))); + + uint32_t maxMemory = 1024; + + RealClock clock; + MemoryState state1(clock, maxMemory); + MemoryState state2(clock, maxMemory); + + state1.setMinJumpToUpdateMax(50); + + state1.addToEntry(putAlloc, 100, 10, + MemoryState::GOT_MAX, false); + state1.addToEntry(putAlloc, 100, 60, + MemoryState::GOT_MAX, false); + state1.addToEntry(blockAlloc, + 200, 20, + MemoryState::GOT_MIN, false); + state1.addToEntry(getAlloc, 0, 15, + MemoryState::DENIED, false, 0); + state1.addToEntry(databaseAlloc, 150, 0, + MemoryState::DENIED, true, 1); + state1.addToEntry(cacheAlloc, 45, 0, + MemoryState::GOT_MAX, true, 1); + + state2.addToEntry(putAlloc, 50, 10, + MemoryState::GOT_MIN, false); + state2.addToEntry(putAlloc, 20, 40, + MemoryState::GOT_MIN, false); + + state1.removeFromEntry(databaseAlloc, 25, 0, 0); + state1.removeFromEntry(putAlloc, 100, 60); + + MemoryState::SnapShot state3; + state3 = state1.getMaxSnapshot(); + state3 += state2.getMaxSnapshot(); + + std::string expected; + expected = + "\n" + "MemoryState(Max memory: 1024) {\n" + " Current: SnapShot(Used 470, w/o cache 425) {\n" + " Type(Pri): Used(Size/Allocs) Stats(Allocs, Wanted, Min, Denied, Forced)\n" + " DATABASE(0): Used(125 B / 1) Stats(1, 0, 0, 1, 1)\n" + " MESSAGE_DOCBLOCK(20): Used(200 B / 1) Stats(1, 0, 1, 0, 0)\n" + " MESSAGE_GET(15): Used(0 B / 0) Stats(1, 0, 0, 1, 0)\n" + " MESSAGE_PUT(10): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" + " MESSAGE_PUT(60): Used(0 B / 0) Stats(1, 1, 0, 0, 0)\n" + " SLOTFILE_CACHE(0): Used(45 B / 1) Stats(1, 1, 0, 0, 1)\n" + " }\n" + " Max: SnapShot(Used 550, w/o cache 550) {\n" + " Type(Pri): Used(Size/Allocs) Stats(Allocs, Wanted, Min, Denied, Forced)\n" + " DATABASE(0): Used(150 B / 1) Stats(1, 0, 0, 1, 1)\n" + " MESSAGE_DOCBLOCK(20): Used(200 B / 1) Stats(1, 0, 1, 0, 0)\n" + " MESSAGE_GET(15): Used(0 B / 0) Stats(1, 0, 0, 1, 0)\n" + " MESSAGE_PUT(10): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" + " MESSAGE_PUT(60): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" + " }\n" + "}"; + + CPPUNIT_ASSERT_EQUAL(expected, "\n" + state1.toString(true)); + expected = "\n" +"MemoryState(Max memory: 1024) {\n" +" Current: SnapShot(Used 70, w/o cache 70) {\n" +" Type(Pri): Used(Size/Allocs) Stats(Allocs, Wanted, Min, Denied, Forced)\n" +" MESSAGE_PUT(10): Used(50 B / 1) Stats(1, 0, 1, 0, 0)\n" +" MESSAGE_PUT(40): Used(20 B / 1) Stats(1, 0, 1, 0, 0)\n" +" }\n" +"}"; + CPPUNIT_ASSERT_EQUAL(expected, "\n" + state2.toString(true)); + expected = "\n" +"SnapShot(Used 550, w/o cache 550) {\n" +" Type(Pri): Used(Size/Allocs) Stats(Allocs, Wanted, Min, Denied, Forced)\n" +" DATABASE(0): Used(150 B / 1) Stats(1, 0, 0, 1, 1)\n" +" MESSAGE_DOCBLOCK(20): Used(200 B / 1) Stats(1, 0, 1, 0, 0)\n" +" MESSAGE_GET(15): Used(0 B / 0) Stats(1, 0, 0, 1, 0)\n" +" MESSAGE_PUT(10): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" +" MESSAGE_PUT(60): Used(100 B / 1) Stats(1, 1, 0, 0, 0)\n" +"}"; + CPPUNIT_ASSERT_EQUAL(expected, "\n" + state3.toString(true)); +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storageframework/src/tests/status/.gitignore b/storageframework/src/tests/status/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/tests/status/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/tests/status/CMakeLists.txt b/storageframework/src/tests/status/CMakeLists.txt new file mode 100644 index 00000000000..2c829d48810 --- /dev/null +++ b/storageframework/src/tests/status/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_teststatus + SOURCES + htmlstatustest.cpp + DEPENDS +) diff --git a/storageframework/src/tests/status/htmlstatustest.cpp b/storageframework/src/tests/status/htmlstatustest.cpp new file mode 100644 index 00000000000..2c3ef818e8e --- /dev/null +++ b/storageframework/src/tests/status/htmlstatustest.cpp @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/vdstestlib/cppunit/macros.h> +#include <vespa/storageframework/generic/status/htmlstatusreporter.h> + +namespace storage { +namespace framework { + +struct HtmlStatusTest : public CppUnit::TestFixture { + + void testHtmlStatus(); + + CPPUNIT_TEST_SUITE(HtmlStatusTest); + CPPUNIT_TEST(testHtmlStatus); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(HtmlStatusTest); + +void +HtmlStatusTest::testHtmlStatus() +{ +} + +} // framework +} // storage diff --git a/storageframework/src/tests/testrunner.cpp b/storageframework/src/tests/testrunner.cpp new file mode 100644 index 00000000000..16027870c47 --- /dev/null +++ b/storageframework/src/tests/testrunner.cpp @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <iostream> +#include <vespa/log/log.h> +#include <vespa/vdstestlib/cppunit/cppunittestrunner.h> + +LOG_SETUP("persistencecppunittests"); + +int +main(int argc, char **argv) +{ + vdstestlib::CppUnitTestRunner testRunner; + return testRunner.run(argc, argv); +} diff --git a/storageframework/src/tests/thread/.gitignore b/storageframework/src/tests/thread/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storageframework/src/tests/thread/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storageframework/src/tests/thread/CMakeLists.txt b/storageframework/src/tests/thread/CMakeLists.txt new file mode 100644 index 00000000000..961b49d065a --- /dev/null +++ b/storageframework/src/tests/thread/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_testthread + SOURCES + tickingthreadtest.cpp + taskthreadtest.cpp + DEPENDS +) diff --git a/storageframework/src/tests/thread/taskthreadtest.cpp b/storageframework/src/tests/thread/taskthreadtest.cpp new file mode 100644 index 00000000000..6b524f10cf5 --- /dev/null +++ b/storageframework/src/tests/thread/taskthreadtest.cpp @@ -0,0 +1,69 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/storageframework/generic/thread/taskthread.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +namespace storage { +namespace framework { + +struct TaskThreadTest : public CppUnit::TestFixture +{ + + void testNormalUsage(); + + CPPUNIT_TEST_SUITE(TaskThreadTest); + CPPUNIT_TEST(testNormalUsage); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(TaskThreadTest); + +namespace { + struct Task { + std::string _name; + uint8_t _priority; + + Task(const std::string& name, uint8_t priority) + : _name(name), _priority(priority) {} + + bool operator<(const Task& other) const { + return (_priority > other._priority); + } + uint8_t getPriority() const { return _priority; } + }; + + struct MyThread : public TaskThread<Task> { + MyThread(ThreadLock& lock) : TaskThread<Task>(lock) {} + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) { + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + }; +} + +void +TaskThreadTest::testNormalUsage() +{ + TickingThreadPool::UP pool(TickingThreadPool::createDefault("testApp")); + + MyThread t(*pool); + t.addTask(Task("a", 6)); + t.addTask(Task("b", 3)); + t.addTask(Task("c", 8)); + t.addTask(Task("d", 4)); + CPPUNIT_ASSERT(t.empty()); // Still empty before critical tick has run + dynamic_cast<TickingThread&>(t).doCriticalTick(0); + CPPUNIT_ASSERT(!t.empty()); + CPPUNIT_ASSERT_EQUAL(3, (int) t.peek().getPriority()); + std::ostringstream ost; + while (!t.empty()) { + Task task(t.peek()); + ost << task._name << '(' << ((int) task.getPriority()) << ") "; + t.pop(); + } + CPPUNIT_ASSERT_EQUAL(std::string("b(3) d(4) a(6) c(8) "), ost.str()); +} + + +} // framework +} // storage diff --git a/storageframework/src/tests/thread/tickingthreadtest.cpp b/storageframework/src/tests/thread/tickingthreadtest.cpp new file mode 100644 index 00000000000..ca15263d446 --- /dev/null +++ b/storageframework/src/tests/thread/tickingthreadtest.cpp @@ -0,0 +1,370 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <vespa/storageframework/defaultimplementation/component/testcomponentregister.h> +#include <vespa/storageframework/generic/thread/tickingthread.h> +#include <vespa/vdstestlib/cppunit/macros.h> + +LOG_SETUP(".tickingthreadtest"); + +namespace storage { +namespace framework { +namespace defaultimplementation { + +struct TickingThreadTest : public CppUnit::TestFixture +{ + void setUp() {} + void tearDown() {} + + void testTicksBeforeWaitBasic(); + void testTicksBeforeWaitLiveUpdate(); + void testDestroyWithoutStarting(); + void testVerboseStopping(); + void testStopOnDeletion(); + void testLockAllTicks(); + void testLockCriticalTicks(); + void testFailsOnStartWithoutThreads(); + void testBroadcast(); + + CPPUNIT_TEST_SUITE(TickingThreadTest); + CPPUNIT_TEST(testTicksBeforeWaitBasic); + CPPUNIT_TEST(testTicksBeforeWaitLiveUpdate); + CPPUNIT_TEST(testDestroyWithoutStarting); + CPPUNIT_TEST(testVerboseStopping); + CPPUNIT_TEST(testStopOnDeletion); + CPPUNIT_TEST(testLockAllTicks); + CPPUNIT_TEST(testLockCriticalTicks); + CPPUNIT_TEST(testFailsOnStartWithoutThreads); + CPPUNIT_TEST(testBroadcast); + CPPUNIT_TEST_SUITE_END(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION(TickingThreadTest); + +namespace { + +struct Context { + uint64_t _critTickCount; + uint64_t _nonCritTickCount; + + Context() : _critTickCount(0), _nonCritTickCount(0) {} +}; + +struct MyApp : public TickingThread { + uint32_t _critOverlapCounter; + bool _doCritOverlapTest; + bool _critOverlap; + std::vector<Context> _context; + TickingThreadPool::UP _threadPool; + + MyApp(int threadCount, bool doCritOverlapTest = false) + : _critOverlapCounter(0), + _doCritOverlapTest(doCritOverlapTest), + _critOverlap(false), + _threadPool(TickingThreadPool::createDefault("testApp")) + { + for (int i=0; i<threadCount; ++i) { + _threadPool->addThread(*this); + _context.push_back(Context()); + } + } + + void start(ThreadPool& p) { _threadPool->start(p); } + + virtual ThreadWaitInfo doCriticalTick(ThreadIndex index) { + assert(index < _context.size()); + Context& c(_context[index]); + if (_doCritOverlapTest) { + uint32_t oldTick = _critOverlapCounter; + FastOS_Thread::Sleep(1); + _critOverlap |= (_critOverlapCounter != oldTick); + ++_critOverlapCounter; + } + ++c._critTickCount; + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex index) { + assert(index < _context.size()); + Context& c(_context[index]); + ++c._nonCritTickCount; + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + + uint64_t getMinCritTick() { + uint64_t min = std::numeric_limits<uint64_t>().max(); + for (uint32_t i=0; i<_context.size(); ++i) { + min = std::min(min, _context[i]._critTickCount); + } + return min; + } + uint64_t getMinNonCritTick() { + uint64_t min = std::numeric_limits<uint64_t>().max(); + for (uint32_t i=0; i<_context.size(); ++i) { + min = std::min(min, _context[i]._critTickCount); + } + return min; + } + uint64_t getTotalCritTicks() { + uint64_t total = 0; + for (uint32_t i=0; i<_context.size(); ++i) { + total += _context[i]._critTickCount; + } + return total; + } + uint64_t getTotalNonCritTicks() { + uint64_t total = 0; + for (uint32_t i=0; i<_context.size(); ++i) { + total += _context[i]._nonCritTickCount; + } + return total; + } + uint64_t getTotalTicks() + { return getTotalCritTicks() + getTotalNonCritTicks(); } + bool hasCritOverlap() { return _critOverlap; } +}; + +} + +void +TickingThreadTest::testTicksBeforeWaitBasic() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 1; + MyApp app(threadCount); + app.start(testReg.getThreadPoolImpl()); + + // Default behaviour is 5ms sleep before each tick. Let's do 20 ticks, + // and verify time is in right ballpark. + int totalSleepMs = 0; + while (app.getTotalNonCritTicks() < 20) { + FastOS_Thread::Sleep(1); + totalSleepMs++; + } + CPPUNIT_ASSERT(totalSleepMs > 10); + app._threadPool->stop(); +} + +void +TickingThreadTest::testTicksBeforeWaitLiveUpdate() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 1; + MyApp app(threadCount); + // Configure thread pool to send bulks of 5000 ticks each second. + long unsigned int ticksBeforeWaitMs = 5000; + MilliSecTime waitTimeMs(1000); + MilliSecTime maxProcessingTime(234234); + app.start(testReg.getThreadPoolImpl()); + app._threadPool->updateParametersAllThreads( + waitTimeMs, maxProcessingTime, ticksBeforeWaitMs); + + // Check that 5000 ticks are received instantly (usually <2 ms) + // (if live update is broken it will take more than an hour). + int maxAttempts = 120000; // a bit more than 120 secs + while (app.getTotalNonCritTicks() < ticksBeforeWaitMs && maxAttempts-->0) { + FastOS_Thread::Sleep(1); + } + + CPPUNIT_ASSERT(maxAttempts>0); + CPPUNIT_ASSERT(app.getTotalNonCritTicks() >= ticksBeforeWaitMs); + app._threadPool->stop(); +} + +void +TickingThreadTest::testDestroyWithoutStarting() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + MyApp app(threadCount, true); +} + +void +TickingThreadTest::testVerboseStopping() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (app.getMinCritTick() < 5) { + FastOS_Thread::Sleep(1); + } + app._threadPool->stop(); +} + +void +TickingThreadTest::testStopOnDeletion() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (app.getMinCritTick() < 5) { + FastOS_Thread::Sleep(1); + } +} + +void +TickingThreadTest::testLockAllTicks() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + MyApp app1(threadCount); + MyApp app2(threadCount); + app1.start(testReg.getThreadPoolImpl()); + app2.start(testReg.getThreadPoolImpl()); + while (std::min(app1.getMinCritTick(), app2.getMinCritTick()) < 5) { + FastOS_Thread::Sleep(1); + } + uint64_t ticks1, ticks2; + { + TickingLockGuard guard(app1._threadPool->freezeAllTicks()); + ticks1 = app1.getTotalTicks(); + ticks2 = app2.getTotalTicks(); + + while (app2.getMinCritTick() < 2 * ticks2 / threadCount) { + FastOS_Thread::Sleep(1); + } + CPPUNIT_ASSERT_EQUAL(ticks1, app1.getTotalTicks()); + } + while (app1.getMinCritTick() < 2 * ticks1 / threadCount) { + FastOS_Thread::Sleep(1); + } +} + +void +TickingThreadTest::testLockCriticalTicks() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 5; + uint64_t iterationsBeforeOverlap = 0; + { + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (!app.hasCritOverlap()) { + FastOS_Thread::Sleep(1); + ++app._critOverlapCounter; + ++iterationsBeforeOverlap; + } + } + { + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + for (uint64_t i=0; i<iterationsBeforeOverlap * 10; ++i) { + FastOS_Thread::Sleep(1); + TickingLockGuard guard(app._threadPool->freezeCriticalTicks()); + for (int j=0; j<threadCount; ++j) { + ++app._context[j]._critTickCount; + } + CPPUNIT_ASSERT(!app.hasCritOverlap()); + } + } +} + +void +TickingThreadTest::testFailsOnStartWithoutThreads() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + int threadCount = 0; + MyApp app(threadCount, true); + try{ + app.start(testReg.getThreadPoolImpl()); + CPPUNIT_FAIL("Expected starting without threads to fail"); + } catch (vespalib::Exception& e) { + CPPUNIT_ASSERT_EQUAL(vespalib::string( + "Makes no sense to start threadpool without threads"), + e.getMessage()); + } +} + +namespace { + +RealClock clock; + +void printTaskInfo(const std::string& task, const char* action) { + vespalib::string msg = vespalib::make_string( + "%" PRIu64 ": %s %s\n", + clock.getTimeInMicros().getTime(), + task.c_str(), + action); + // std::cerr << msg; +} + +struct BroadcastApp : public TickingThread { + std::vector<std::string> _queue; + std::vector<std::string> _active; + std::vector<std::string> _processed; + TickingThreadPool::UP _threadPool; + + // Set a huge wait time by default to ensure we have to notify + BroadcastApp() + : _threadPool(TickingThreadPool::createDefault( + "testApp", MilliSecTime(300000))) + { + _threadPool->addThread(*this); + } + + void start(ThreadPool& p) { _threadPool->start(p); } + + virtual ThreadWaitInfo doCriticalTick(ThreadIndex) { + if (!_queue.empty()) { + for (uint32_t i=0; i<_queue.size(); ++i) { + printTaskInfo(_queue[i], "activating"); + _active.push_back(_queue[i]); + } + _queue.clear(); + return ThreadWaitInfo::MORE_WORK_ENQUEUED; + } + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) { + if (!_active.empty()) { + for (uint32_t i=0; i<_active.size(); ++i) { + printTaskInfo(_queue[i], "processing"); + _processed.push_back(_active[i]); + } + _active.clear(); + } + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + + void doTask(const std::string& task) { + printTaskInfo(task, "enqueue"); + TickingLockGuard guard(_threadPool->freezeCriticalTicks()); + _queue.push_back(task); + guard.broadcast(); + } +}; + +} + + +void +TickingThreadTest::testBroadcast() +{ + TestComponentRegister testReg( + ComponentRegisterImpl::UP(new ComponentRegisterImpl)); + BroadcastApp app; + app.start(testReg.getThreadPoolImpl()); + app.doTask("foo"); + FastOS_Thread::Sleep(1); + app.doTask("bar"); + FastOS_Thread::Sleep(1); + app.doTask("baz"); + FastOS_Thread::Sleep(1); + app.doTask("hmm"); + FastOS_Thread::Sleep(1); +} + +} // defaultimplementation +} // framework +} // storage |