diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-05-18 22:14:24 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-05-18 22:14:24 +0000 |
commit | 7b0a212729d115561163b702fec8794d91b0653d (patch) | |
tree | 7f70faf19a7f27d9af422e9a08ffbaef1cb72cb7 /storage | |
parent | c5e298b28a9fdf524fe9282a3cfe9ae7aaef174f (diff) |
Collapse storageframework back into storage.
Diffstat (limited to 'storage')
72 files changed, 3371 insertions, 1 deletions
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index e45c78f68b9..fc084bf2e80 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -15,7 +15,6 @@ vespa_define_module( vespalib vdslib persistence - storageframework EXTERNAL_DEPENDS ${VESPA_GLIBC_RT_LIB} @@ -42,6 +41,14 @@ vespa_define_module( src/vespa/storage/storageutil src/vespa/storage/tools src/vespa/storage/visiting + src/vespa/storageframework/defaultimplementation/clock + src/vespa/storageframework/defaultimplementation/component + src/vespa/storageframework/defaultimplementation/thread + src/vespa/storageframework/generic/clock + src/vespa/storageframework/generic/component + src/vespa/storageframework/generic/metric + src/vespa/storageframework/generic/status + src/vespa/storageframework/generic/thread TEST_DEPENDS messagebus_messagebus-test @@ -59,6 +66,9 @@ vespa_define_module( src/tests/persistence src/tests/persistence/common src/tests/persistence/filestorage + src/tests/storageframework + src/tests/storageframework/clock + src/tests/storageframework/thread src/tests/storageserver src/tests/storageserver/rpc src/tests/visiting diff --git a/storage/src/tests/storageframework/.gitignore b/storage/src/tests/storageframework/.gitignore new file mode 100644 index 00000000000..94b0b80fe52 --- /dev/null +++ b/storage/src/tests/storageframework/.gitignore @@ -0,0 +1,6 @@ +/.depend +/Makefile +/test.vlog +/testrunner +storageframework_gtest_runner_app +storageframework_testrunner_app diff --git a/storage/src/tests/storageframework/CMakeLists.txt b/storage/src/tests/storageframework/CMakeLists.txt new file mode 100644 index 00000000000..fdda595125a --- /dev/null +++ b/storage/src/tests/storageframework/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +# Runner for unit tests written in gtest. +# NOTE: All new test classes should be added here. +vespa_add_executable(storageframework_gtest_runner_app TEST + SOURCES + gtest_runner.cpp + DEPENDS + storageframework_testclock + storageframework_testthread + GTest::GTest +) + +vespa_add_test( + NAME storageframework_gtest_runner_app + COMMAND storageframework_gtest_runner_app +) diff --git a/storage/src/tests/storageframework/clock/.gitignore b/storage/src/tests/storageframework/clock/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/tests/storageframework/clock/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/tests/storageframework/clock/CMakeLists.txt b/storage/src/tests/storageframework/clock/CMakeLists.txt new file mode 100644 index 00000000000..46015e48cb2 --- /dev/null +++ b/storage/src/tests/storageframework/clock/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_testclock + SOURCES + timetest.cpp + DEPENDS + storageframework + GTest::GTest +) diff --git a/storage/src/tests/storageframework/clock/timetest.cpp b/storage/src/tests/storageframework/clock/timetest.cpp new file mode 100644 index 00000000000..e1270156aa0 --- /dev/null +++ b/storage/src/tests/storageframework/clock/timetest.cpp @@ -0,0 +1,59 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> +#include <gtest/gtest.h> + +namespace storage::framework::defaultimplementation { + +TEST(TimeTest, testBasics) +{ + SecondTime timeSec(1); + + MilliSecTime timeMillis = timeSec.getMillis(); + EXPECT_EQ(uint64_t(1000), timeMillis.getTime()); + EXPECT_EQ(timeSec, timeMillis.getSeconds()); + + MicroSecTime timeMicros = timeSec.getMicros(); + EXPECT_EQ(timeSec.getMicros(), timeMillis.getMicros()); + EXPECT_EQ(timeMillis, timeMicros.getMillis()); + EXPECT_EQ(timeSec, timeMicros.getSeconds()); + + MicroSecTime timeMicros2 = timeMicros; + EXPECT_EQ(timeMicros2, timeMicros); + timeMicros2 += MicroSecTime(25000); + EXPECT_GT(timeMicros2, timeMicros); + EXPECT_LT(timeMicros, timeMicros2); + timeMicros2 -= MicroSecTime(30000); + EXPECT_LT(timeMicros2, timeMicros); + EXPECT_GT(timeMicros, timeMicros2); + timeMicros2 += MicroSecTime(55000); + + MilliSecTime timeMillis2 = timeMicros2.getMillis(); + EXPECT_GT(timeMillis2, timeMillis); + EXPECT_EQ(uint64_t(1050), timeMillis2.getTime()); + EXPECT_EQ(timeSec, timeMillis2.getSeconds()); +} + +TEST(TimeTest, testCreatedFromClock) +{ + defaultimplementation::FakeClock clock; + clock.setAbsoluteTimeInSeconds(600); + + EXPECT_EQ(SecondTime(600), SecondTime(clock)); + EXPECT_EQ(MilliSecTime(600 * 1000), MilliSecTime(clock)); + EXPECT_EQ(MicroSecTime(600 * 1000 * 1000), MicroSecTime(clock)); +} + +TEST(TimeTest, canAssignMicrosecondResolutionTimeToFakeClock) +{ + defaultimplementation::FakeClock clock; + clock.setAbsoluteTimeInMicroSeconds(1234567); // 1.234567 seconds + + // All non-microsec time points must necessarily be truncated. + EXPECT_EQ(SecondTime(1), SecondTime(clock)); + EXPECT_EQ(MilliSecTime(1234), MilliSecTime(clock)); + EXPECT_EQ(MicroSecTime(1234567), MicroSecTime(clock)); +} + +} diff --git a/storage/src/tests/storageframework/gtest_runner.cpp b/storage/src/tests/storageframework/gtest_runner.cpp new file mode 100644 index 00000000000..105477e0b4b --- /dev/null +++ b/storage/src/tests/storageframework/gtest_runner.cpp @@ -0,0 +1,8 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/gtest/gtest.h> + +#include <vespa/log/log.h> +LOG_SETUP("storageframework_gtest_runner"); + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/storage/src/tests/storageframework/thread/.gitignore b/storage/src/tests/storageframework/thread/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/tests/storageframework/thread/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/tests/storageframework/thread/CMakeLists.txt b/storage/src/tests/storageframework/thread/CMakeLists.txt new file mode 100644 index 00000000000..d50f5933c51 --- /dev/null +++ b/storage/src/tests/storageframework/thread/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_testthread + SOURCES + tickingthreadtest.cpp + taskthreadtest.cpp + DEPENDS + storageframework + GTest::GTest +) diff --git a/storage/src/tests/storageframework/thread/taskthreadtest.cpp b/storage/src/tests/storageframework/thread/taskthreadtest.cpp new file mode 100644 index 00000000000..32bc6e8a8fb --- /dev/null +++ b/storage/src/tests/storageframework/thread/taskthreadtest.cpp @@ -0,0 +1,54 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storageframework/generic/thread/taskthread.h> +#include <vespa/vespalib/gtest/gtest.h> + +namespace storage::framework { + +namespace { + +struct Task { + std::string _name; + uint8_t _priority; + + Task(const std::string& name, uint8_t priority) + : _name(name), _priority(priority) {} + + bool operator<(const Task& other) const { + return (_priority > other._priority); + } + uint8_t getPriority() const { return _priority; } +}; + +struct MyThread : public TaskThread<Task> { + MyThread(ThreadLock& lock) : TaskThread<Task>(lock) {} + ThreadWaitInfo doNonCriticalTick(ThreadIndex) override { + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } +}; + +} + +TEST(TaskThreadTest, test_normal_usage) +{ + TickingThreadPool::UP pool(TickingThreadPool::createDefault("testApp", 100ms)); + + MyThread t(*pool); + t.addTask(Task("a", 6)); + t.addTask(Task("b", 3)); + t.addTask(Task("c", 8)); + t.addTask(Task("d", 4)); + EXPECT_TRUE(t.empty()); // Still empty before critical tick has run + dynamic_cast<TickingThread&>(t).doCriticalTick(0); + ASSERT_TRUE(!t.empty()); + EXPECT_EQ(3, t.peek().getPriority()); + std::ostringstream ost; + while (!t.empty()) { + Task task(t.peek()); + ost << task._name << '(' << ((int) task.getPriority()) << ") "; + t.pop(); + } + EXPECT_EQ(std::string("b(3) d(4) a(6) c(8) "), ost.str()); +} + +} diff --git a/storage/src/tests/storageframework/thread/tickingthreadtest.cpp b/storage/src/tests/storageframework/thread/tickingthreadtest.cpp new file mode 100644 index 00000000000..577e9d128b9 --- /dev/null +++ b/storage/src/tests/storageframework/thread/tickingthreadtest.cpp @@ -0,0 +1,290 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storageframework/defaultimplementation/clock/realclock.h> +#include <vespa/storageframework/defaultimplementation/component/testcomponentregister.h> +#include <vespa/storageframework/generic/thread/tickingthread.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/atomic.h> +#include <thread> + +using namespace vespalib::atomic; + +namespace storage::framework::defaultimplementation { + +namespace { + +struct Context { + std::atomic<uint64_t> _critTickCount; + std::atomic<uint64_t> _nonCritTickCount; + + constexpr Context() noexcept : _critTickCount(0), _nonCritTickCount(0) {} + Context(const Context& rhs) noexcept + : _critTickCount(load_relaxed(rhs._critTickCount)), + _nonCritTickCount(load_relaxed(rhs._nonCritTickCount)) + {} +}; + +struct MyApp : public TickingThread { + std::atomic<uint32_t> _critOverlapCounter; + std::atomic<bool> _critOverlap; + bool _doCritOverlapTest; + std::vector<Context> _context; + TickingThreadPool::UP _threadPool; + + MyApp(int threadCount, bool doCritOverlapTest = false); + ~MyApp() override; + + void start(ThreadPool& p) { _threadPool->start(p); } + + ThreadWaitInfo doCriticalTick(ThreadIndex index) override { + assert(index < _context.size()); + Context& c(_context[index]); + if (_doCritOverlapTest) { + uint32_t oldTick = load_relaxed(_critOverlapCounter); + std::this_thread::sleep_for(1ms); + store_relaxed(_critOverlap, load_relaxed(_critOverlap) || (load_relaxed(_critOverlapCounter) != oldTick)); + _critOverlapCounter.fetch_add(1, std::memory_order_relaxed); + } + c._critTickCount.fetch_add(1, std::memory_order_relaxed); + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + ThreadWaitInfo doNonCriticalTick(ThreadIndex index) override { + assert(index < _context.size()); + Context& c(_context[index]); + c._nonCritTickCount.fetch_add(1, std::memory_order_relaxed); + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + uint64_t getMinCritTick() { + uint64_t min = std::numeric_limits<uint64_t>().max(); + for (uint32_t i=0; i<_context.size(); ++i) { + min = std::min(min, load_relaxed(_context[i]._critTickCount)); + } + return min; + } + uint64_t getMinNonCritTick() { + uint64_t min = std::numeric_limits<uint64_t>().max(); + for (uint32_t i=0; i<_context.size(); ++i) { + min = std::min(min, load_relaxed(_context[i]._critTickCount)); + } + return min; + } + uint64_t getTotalCritTicks() const noexcept { + uint64_t total = 0; + for (uint32_t i=0; i<_context.size(); ++i) { + total += load_relaxed(_context[i]._critTickCount); + } + return total; + } + uint64_t getTotalNonCritTicks() const noexcept { + uint64_t total = 0; + for (uint32_t i=0; i<_context.size(); ++i) { + total += load_relaxed(_context[i]._nonCritTickCount); + } + return total; + } + uint64_t getTotalTicks() const noexcept { + return getTotalCritTicks() + getTotalNonCritTicks(); + } + bool hasCritOverlap() const noexcept { return load_relaxed(_critOverlap); } +}; + +MyApp::MyApp(int threadCount, bool doCritOverlapTest) + : _critOverlapCounter(0), + _critOverlap(false), + _doCritOverlapTest(doCritOverlapTest), + _threadPool(TickingThreadPool::createDefault("testApp", 100ms)) +{ + for (int i=0; i<threadCount; ++i) { + _threadPool->addThread(*this); + _context.emplace_back(); + } +} + +MyApp::~MyApp() = default; + +} + +TEST(TickingThreadTest, test_ticks_before_wait_basic) +{ + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); + int threadCount = 1; + MyApp app(threadCount); + app.start(testReg.getThreadPoolImpl()); + + // Default behaviour is 5ms sleep before each tick. Let's do 20 ticks, + // and verify time is in right ballpark. + int totalSleepMs = 0; + while (app.getTotalNonCritTicks() < 20) { + std::this_thread::sleep_for(1ms); + totalSleepMs++; + } + EXPECT_GT(totalSleepMs, 10); + app._threadPool->stop(); +} + +TEST(TickingThreadTest, test_destroy_without_starting) +{ + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); + int threadCount = 5; + MyApp app(threadCount, true); +} + +TEST(TickingThreadTest, test_verbose_stopping) +{ + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); + int threadCount = 5; + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (app.getMinCritTick() < 5) { + std::this_thread::sleep_for(1ms); + } + app._threadPool->stop(); +} + +TEST(TickingThreadTest, test_stop_on_deletion) +{ + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); + int threadCount = 5; + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (app.getMinCritTick() < 5) { + std::this_thread::sleep_for(1ms); + } +} + +TEST(TickingThreadTest, test_lock_all_ticks) +{ + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); + int threadCount = 5; + MyApp app1(threadCount); + MyApp app2(threadCount); + app1.start(testReg.getThreadPoolImpl()); + app2.start(testReg.getThreadPoolImpl()); + while (std::min(app1.getMinCritTick(), app2.getMinCritTick()) < 5) { + std::this_thread::sleep_for(1ms); + } + uint64_t ticks1, ticks2; + { + TickingLockGuard guard(app1._threadPool->freezeAllTicks()); + ticks1 = app1.getTotalTicks(); + ticks2 = app2.getTotalTicks(); + + while (app2.getMinCritTick() < 2 * ticks2 / threadCount) { + std::this_thread::sleep_for(1ms); + } + EXPECT_EQ(ticks1, app1.getTotalTicks()); + } + while (app1.getMinCritTick() < 2 * ticks1 / threadCount) { + std::this_thread::sleep_for(1ms); + } +} + +TEST(TickingThreadTest, test_lock_critical_ticks) +{ + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); + int threadCount = 5; + uint64_t iterationsBeforeOverlap = 0; + { + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + while (!app.hasCritOverlap()) { + std::this_thread::sleep_for(1ms); + app._critOverlapCounter.fetch_add(1, std::memory_order_relaxed); + ++iterationsBeforeOverlap; + } + } + { + MyApp app(threadCount, true); + app.start(testReg.getThreadPoolImpl()); + for (uint64_t i=0; i<iterationsBeforeOverlap * 10; ++i) { + std::this_thread::sleep_for(1ms); + TickingLockGuard guard(app._threadPool->freezeCriticalTicks()); + for (int j=0; j<threadCount; ++j) { + ++app._context[j]._critTickCount; + } + EXPECT_TRUE(!app.hasCritOverlap()); + } + } +} + +namespace { + +RealClock clock; + +void printTaskInfo(const std::string& task, const char* action) { + vespalib::string msg = vespalib::make_string( + "%" PRIu64 ": %s %s\n", + clock.getTimeInMicros().getTime(), + task.c_str(), + action); + // std::cerr << msg; +} + +struct BroadcastApp : public TickingThread { + std::vector<std::string> _queue; + std::vector<std::string> _active; + std::vector<std::string> _processed; + TickingThreadPool::UP _threadPool; + + // Set a huge wait time by default to ensure we have to notify + BroadcastApp(); + ~BroadcastApp(); + + void start(ThreadPool& p) { _threadPool->start(p); } + + ThreadWaitInfo doCriticalTick(ThreadIndex) override { + if (!_queue.empty()) { + for (uint32_t i=0; i<_queue.size(); ++i) { + printTaskInfo(_queue[i], "activating"); + _active.push_back(_queue[i]); + } + _queue.clear(); + return ThreadWaitInfo::MORE_WORK_ENQUEUED; + } + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + ThreadWaitInfo doNonCriticalTick(ThreadIndex) override { + if (!_active.empty()) { + for (uint32_t i=0; i<_active.size(); ++i) { + printTaskInfo(_active[i], "processing"); + _processed.push_back(_active[i]); + } + _active.clear(); + } + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + } + + void doTask(const std::string& task) { + printTaskInfo(task, "enqueue"); + TickingLockGuard guard(_threadPool->freezeCriticalTicks()); + _queue.push_back(task); + guard.broadcast(); + } +}; + +BroadcastApp::BroadcastApp() + : _threadPool(TickingThreadPool::createDefault("testApp", 300s)) +{ + _threadPool->addThread(*this); +} +BroadcastApp::~BroadcastApp() = default; + +} + +TEST(TickingThreadTest, test_broadcast) +{ + TestComponentRegister testReg(std::make_unique<ComponentRegisterImpl>()); + BroadcastApp app; + app.start(testReg.getThreadPoolImpl()); + app.doTask("foo"); + std::this_thread::sleep_for(1ms); + app.doTask("bar"); + std::this_thread::sleep_for(1ms); + app.doTask("baz"); + std::this_thread::sleep_for(1ms); + app.doTask("hmm"); + std::this_thread::sleep_for(1ms); +} + +} diff --git a/storage/src/vespa/storage/CMakeLists.txt b/storage/src/vespa/storage/CMakeLists.txt index 679245b06d1..22aad2a147d 100644 --- a/storage/src/vespa/storage/CMakeLists.txt +++ b/storage/src/vespa/storage/CMakeLists.txt @@ -20,6 +20,13 @@ vespa_add_library(storage $<TARGET_OBJECTS:storage_distributormaintenance> $<TARGET_OBJECTS:storage_distributor_bucketdb> $<TARGET_OBJECTS:storage_distributor> + $<TARGET_OBJECTS:storageframework_component> + $<TARGET_OBJECTS:storageframework_status> + $<TARGET_OBJECTS:storageframework_thread> + $<TARGET_OBJECTS:storageframework_clock> + $<TARGET_OBJECTS:storageframework_clockimpl> + $<TARGET_OBJECTS:storageframework_componentimpl> + $<TARGET_OBJECTS:storageframework_threadimpl> INSTALL lib64 DEPENDS ) diff --git a/storage/src/vespa/storageframework/.gitignore b/storage/src/vespa/storageframework/.gitignore new file mode 100644 index 00000000000..fb3ac588d47 --- /dev/null +++ b/storage/src/vespa/storageframework/.gitignore @@ -0,0 +1,4 @@ +/.depend +/Makefile +/features.h +/libstorageframework.so.5.1 diff --git a/storage/src/vespa/storageframework/defaultimplementation/.gitignore b/storage/src/vespa/storageframework/defaultimplementation/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/.gitignore b/storage/src/vespa/storageframework/defaultimplementation/clock/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/CMakeLists.txt b/storage/src/vespa/storageframework/defaultimplementation/clock/CMakeLists.txt new file mode 100644 index 00000000000..9e22f36e6c3 --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_clockimpl OBJECT + SOURCES + realclock.cpp + fakeclock.cpp + DEPENDS +) diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.cpp b/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.cpp new file mode 100644 index 00000000000..d02cecd79bb --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.cpp @@ -0,0 +1,17 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "fakeclock.h" + +namespace storage { +namespace framework { +namespace defaultimplementation { + +FakeClock::FakeClock(Mode m, framework::MicroSecTime startTime) + : _mode(m), + _absoluteTime(startTime), + _cycleCount(0) +{ +} + +} // defaultimplementation +} // framework +} // storage diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h new file mode 100644 index 00000000000..f4cdcf2b8ce --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/fakeclock.h @@ -0,0 +1,82 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::FakeClock + * \ingroup test + * + * \brief Implements a fake clock to use for testing. + */ +#pragma once + +#include <vespa/storageframework/generic/clock/clock.h> +#include <mutex> + +namespace storage::framework::defaultimplementation { + +struct FakeClock : public framework::Clock { + enum Mode { + FAKE_ABSOLUTE, // Time is always equal to supplied absolute time + FAKE_ABSOLUTE_CYCLE // Time is equal to absolute time + counter that + // increases for each request so you never get same + // timestamp twice. + }; + +private: + Mode _mode; + framework::MicroSecTime _absoluteTime; + mutable time_t _cycleCount; + mutable std::mutex _lock; + +public: + FakeClock(Mode m = FAKE_ABSOLUTE, + framework::MicroSecTime startTime = framework::MicroSecTime(1)); + + void setMode(Mode m) { + std::lock_guard guard(_lock); + _mode = m; + } + virtual void setFakeCycleMode() { setMode(FAKE_ABSOLUTE_CYCLE); } + + virtual void setAbsoluteTimeInSeconds(uint32_t seconds) { + std::lock_guard guard(_lock); + _absoluteTime = framework::MicroSecTime(seconds * uint64_t(1000000)); + _cycleCount = 0; + _mode = FAKE_ABSOLUTE; + } + + virtual void setAbsoluteTimeInMicroSeconds(uint64_t usecs) { + std::lock_guard guard(_lock); + _absoluteTime = framework::MicroSecTime(usecs); + _cycleCount = 0; + _mode = FAKE_ABSOLUTE; + } + + virtual void addMilliSecondsToTime(uint64_t ms) { + std::lock_guard guard(_lock); + _absoluteTime += framework::MicroSecTime(ms * 1000); + } + + virtual void addSecondsToTime(uint32_t nr) { + std::lock_guard guard(_lock); + _absoluteTime += framework::MicroSecTime(nr * uint64_t(1000000)); + } + + framework::MicroSecTime getTimeInMicros() const override { + std::lock_guard guard(_lock); + if (_mode == FAKE_ABSOLUTE) return _absoluteTime; + return _absoluteTime + framework::MicroSecTime(1000000 * _cycleCount++); + } + framework::MilliSecTime getTimeInMillis() const override { + return getTimeInMicros().getMillis(); + } + framework::SecondTime getTimeInSeconds() const override { + return getTimeInMicros().getSeconds(); + } + framework::MonotonicTimePoint getMonotonicTime() const override { + // For simplicity, assume fake monotonic time follows fake wall clock. + return MonotonicTimePoint(std::chrono::microseconds( + getTimeInMicros().getTime())); + } +}; + +} + diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp new file mode 100644 index 00000000000..0303481feb5 --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.cpp @@ -0,0 +1,31 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "realclock.h" +#include <sys/time.h> + +namespace storage::framework::defaultimplementation { + +MicroSecTime RealClock::getTimeInMicros() const { + struct timeval mytime; + gettimeofday(&mytime, 0); + return MicroSecTime(mytime.tv_sec * 1000000llu + mytime.tv_usec); +} + +MilliSecTime RealClock::getTimeInMillis() const { + struct timeval mytime; + gettimeofday(&mytime, 0); + return MilliSecTime( + mytime.tv_sec * 1000llu + mytime.tv_usec / 1000); +} + +SecondTime RealClock::getTimeInSeconds() const { + struct timeval mytime; + gettimeofday(&mytime, 0); + return SecondTime(mytime.tv_sec); +} + +MonotonicTimePoint RealClock::getMonotonicTime() const { + return std::chrono::steady_clock::now(); +} + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h new file mode 100644 index 00000000000..a4b80a990c9 --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h @@ -0,0 +1,23 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::RealClock + * \ingroup frameworkimpl + * + * \brief Implements a class for calculating current time. + * + * Real implementation for gathering all clock information used in application. + */ +#pragma once + +#include <vespa/storageframework/generic/clock/clock.h> + +namespace storage::framework::defaultimplementation { + +struct RealClock : public Clock { + MicroSecTime getTimeInMicros() const override; + MilliSecTime getTimeInMillis() const override; + SecondTime getTimeInSeconds() const override; + MonotonicTimePoint getMonotonicTime() const override; +}; + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/.gitignore b/storage/src/vespa/storageframework/defaultimplementation/component/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/component/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/CMakeLists.txt b/storage/src/vespa/storageframework/defaultimplementation/component/CMakeLists.txt new file mode 100644 index 00000000000..5181d709dc3 --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/component/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_componentimpl OBJECT + SOURCES + componentregisterimpl.cpp + testcomponentregister.cpp + DEPENDS +) diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp new file mode 100644 index 00000000000..0da02b15a11 --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.cpp @@ -0,0 +1,169 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "componentregisterimpl.h" +#include <vespa/storageframework/storageframework.h> +#include <vespa/metrics/metricmanager.h> +#include <vespa/vespalib/util/exceptions.h> +#include <cassert> + +namespace storage::framework::defaultimplementation { + +ComponentRegisterImpl::ComponentRegisterImpl() + : _componentLock(), + _components(), + _topMetricSet("vds", {}, ""), + _hooks(), + _metricManager(nullptr), + _clock(nullptr), + _threadPool(nullptr), + _upgradeFlag(NO_UPGRADE_SPECIAL_HANDLING_ACTIVE), + _shutdownListener(nullptr) +{ } + +ComponentRegisterImpl::~ComponentRegisterImpl() = default; + +void +ComponentRegisterImpl::registerComponent(ManagedComponent& mc) +{ + std::lock_guard lock(_componentLock); + _components.push_back(&mc); + if (_clock) { + mc.setClock(*_clock); + } + if (_threadPool) { + mc.setThreadPool(*_threadPool); + } + if (_metricManager) { + mc.setMetricRegistrator(*this); + } + mc.setUpgradeFlag(_upgradeFlag); +} + +void +ComponentRegisterImpl::requestShutdown(vespalib::stringref reason) +{ + std::lock_guard lock(_componentLock); + if (_shutdownListener) { + _shutdownListener->requestShutdown(reason); + } +} + +void +ComponentRegisterImpl::setMetricManager(metrics::MetricManager& mm) +{ + std::vector<ManagedComponent*> components; + { + std::lock_guard lock(_componentLock); + assert(_metricManager == nullptr); + components = _components; + _metricManager = &mm; + } + { + metrics::MetricLockGuard lock(mm.getMetricLock()); + mm.registerMetric(lock, _topMetricSet); + } + for (auto* component : _components) { + component->setMetricRegistrator(*this); + } +} + +void +ComponentRegisterImpl::setClock(Clock& c) +{ + std::lock_guard lock(_componentLock); + assert(_clock == nullptr); + _clock = &c; + for (auto* component : _components) { + component->setClock(c); + } +} + +void +ComponentRegisterImpl::setThreadPool(ThreadPool& tp) +{ + std::lock_guard lock(_componentLock); + assert(_threadPool == nullptr); + _threadPool = &tp; + for (auto* component : _components) { + component->setThreadPool(tp); + } +} + +void +ComponentRegisterImpl::setUpgradeFlag(UpgradeFlags flag) +{ + std::lock_guard lock(_componentLock); + _upgradeFlag = flag; + for (auto* component : _components) { + component->setUpgradeFlag(_upgradeFlag); + } +} + +const StatusReporter* +ComponentRegisterImpl::getStatusReporter(vespalib::stringref id) +{ + std::lock_guard lock(_componentLock); + for (auto* component : _components) { + if ((component->getStatusReporter() != nullptr) + && (component->getStatusReporter()->getId() == id)) + { + return component->getStatusReporter(); + } + } + return nullptr; +} + +std::vector<const StatusReporter*> +ComponentRegisterImpl::getStatusReporters() +{ + std::vector<const StatusReporter*> reporters; + std::lock_guard lock(_componentLock); + for (auto* component : _components) { + if (component->getStatusReporter() != nullptr) { + reporters.emplace_back(component->getStatusReporter()); + } + } + return reporters; +} + +void +ComponentRegisterImpl::registerMetric(metrics::Metric& m) +{ + metrics::MetricLockGuard lock(_metricManager->getMetricLock()); + _topMetricSet.registerMetric(m); +} + +namespace { + struct MetricHookWrapper : public metrics::UpdateHook { + MetricUpdateHook& _hook; + + MetricHookWrapper(vespalib::stringref name, MetricUpdateHook& hook) + : metrics::UpdateHook(name.data()), // Expected to point to static name + _hook(hook) + { + } + + void updateMetrics(const MetricLockGuard & guard) override { _hook.updateMetrics(guard); } + }; +} + +void +ComponentRegisterImpl::registerUpdateHook(vespalib::stringref name, + MetricUpdateHook& hook, + SecondTime period) +{ + std::lock_guard lock(_componentLock); + auto hookPtr = std::make_unique<MetricHookWrapper>(name, hook); + _metricManager->addMetricUpdateHook(*hookPtr, period.getTime()); + _hooks.emplace_back(std::move(hookPtr)); +} + +void +ComponentRegisterImpl::registerShutdownListener(ShutdownListener& listener) +{ + std::lock_guard lock(_componentLock); + assert(_shutdownListener == nullptr); + _shutdownListener = &listener; +} + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h b/storage/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h new file mode 100644 index 00000000000..6cfae8086fc --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/component/componentregisterimpl.h @@ -0,0 +1,83 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::ComponentRegisterImpl + * \ingroup component + * + * \brief Application server uses this class to manage components. + * + * This class implements set functions for the various implementations needed. + * It will set these implementations in all components already registered, and + * in components registered after that. Simplifies login in application server + * as it can just instantiate components in some order and set implementations + * as soon as they exist. + * + * It is possibly to subclass this implementation. That is useful if you also + * subclass component class to provide extra functionality. Then you can handle + * that extra functionality in the subclass. + */ +#pragma once + +#include <vespa/storageframework/generic/component/componentregister.h> +#include <vespa/storageframework/generic/component/managedcomponent.h> +#include <vespa/storageframework/generic/metric/metricregistrator.h> +#include <vespa/storageframework/generic/status/statusreportermap.h> +#include <vespa/metrics/metricset.h> +#include <mutex> + +namespace metrics { + + class MetricManager; + class UpdateHook; + +} + + +namespace storage::framework::defaultimplementation { + +struct ShutdownListener { + virtual ~ShutdownListener() {} + virtual void requestShutdown(vespalib::stringref reason) = 0; +}; + +class ComponentRegisterImpl : public virtual ComponentRegister, + public StatusReporterMap, + public MetricRegistrator +{ + std::mutex _componentLock; + std::vector<ManagedComponent*> _components; + + metrics::MetricSet _topMetricSet; + std::vector<std::unique_ptr<metrics::UpdateHook>> _hooks; + metrics::MetricManager* _metricManager; + Clock* _clock; + ThreadPool* _threadPool; + UpgradeFlags _upgradeFlag; + ShutdownListener* _shutdownListener; + +public: + typedef std::unique_ptr<ComponentRegisterImpl> UP; + + ComponentRegisterImpl(); + ~ComponentRegisterImpl() override; + + bool hasMetricManager() const { return (_metricManager != 0); } + metrics::MetricManager& getMetricManager() { return *_metricManager; } + + void registerComponent(ManagedComponent&) override; + void requestShutdown(vespalib::stringref reason) override; + + void setMetricManager(metrics::MetricManager&); + void setClock(Clock&); + void setThreadPool(ThreadPool&); + void setUpgradeFlag(UpgradeFlags flag); + + const StatusReporter* getStatusReporter(vespalib::stringref id) override; + std::vector<const StatusReporter*> getStatusReporters() override; + + void registerMetric(metrics::Metric&) override; + void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) override; + void registerShutdownListener(ShutdownListener&); + +}; + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp new file mode 100644 index 00000000000..945e264aaea --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.cpp @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "testcomponentregister.h" +#include <cassert> + +namespace storage::framework::defaultimplementation { + +TestComponentRegister::TestComponentRegister(ComponentRegisterImpl::UP compReg) + : _compReg(std::move(compReg)), + _clock(), + _threadPool(_clock) +{ + assert(_compReg.get() != 0); + // Set a fake clock, giving test control of clock + _compReg->setClock(_clock); + // Set a thread pool so components can make threads in tests. + _compReg->setThreadPool(_threadPool); + // Metric manager should not be needed. Tests of metric system can + // be done without using this class. Components can still register + // metrics without a manager. + + // Status page server should not be needed. Tests of status parts + // can be done without using this class. Components can still + // register status pages without a server +} + +TestComponentRegister::~TestComponentRegister() = default; + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h new file mode 100644 index 00000000000..bd4afa6c9ad --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/component/testcomponentregister.h @@ -0,0 +1,37 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::TestComponentRegister + * \ingroup component + * + * \brief Simple instance to use for testing. + * + * For testing we just want to set up a simple component register with the basic + * services that tests need, and that all tests need the same instance of. + * + * This instance should be the same for all using it. So don't add set functions + * that can possibly alter it while running. + */ +#pragma once + +#include "componentregisterimpl.h" +#include <vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> + +namespace storage::framework::defaultimplementation { + +class TestComponentRegister { + ComponentRegisterImpl::UP _compReg; + FakeClock _clock; + ThreadPoolImpl _threadPool; + +public: + TestComponentRegister(ComponentRegisterImpl::UP compReg); + ~TestComponentRegister(); + + ComponentRegisterImpl& getComponentRegister() { return *_compReg; } + FakeClock& getClock() { return _clock; } + ThreadPoolImpl& getThreadPoolImpl() { return _threadPool; } + FastOS_ThreadPool& getThreadPool() { return _threadPool.getThreadPool(); } +}; + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/.gitignore b/storage/src/vespa/storageframework/defaultimplementation/thread/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/CMakeLists.txt b/storage/src/vespa/storageframework/defaultimplementation/thread/CMakeLists.txt new file mode 100644 index 00000000000..950bc527ece --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_threadimpl OBJECT + SOURCES + threadimpl.cpp + threadpoolimpl.cpp + DEPENDS +) diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp new file mode 100644 index 00000000000..b8ef8e4610b --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.cpp @@ -0,0 +1,144 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "threadimpl.h" +#include "threadpoolimpl.h" +#include <vespa/storageframework/generic/clock/clock.h> +#include <vespa/vespalib/util/atomic.h> + +#include <vespa/log/bufferedlogger.h> +LOG_SETUP(".framework.thread.impl"); + +using namespace vespalib::atomic; + +namespace storage::framework::defaultimplementation { + +ThreadImpl::ThreadImpl(ThreadPoolImpl& pool, + Runnable& runnable, + vespalib::stringref id, + vespalib::duration waitTime, + vespalib::duration maxProcessTime, + int ticksBeforeWait, + std::optional<vespalib::CpuUsage::Category> cpu_category) + : Thread(id), + _pool(pool), + _runnable(runnable), + _properties(waitTime, maxProcessTime, ticksBeforeWait), + _tickData(), + _tickDataPtr(0), + _interrupted(false), + _joined(false), + _thread(*this), + _cpu_category(cpu_category) +{ + _tickData[load_relaxed(_tickDataPtr)]._lastTick = pool.getClock().getMonotonicTime(); + _thread.start(_pool.getThreadPool()); +} + +ThreadImpl::~ThreadImpl() +{ + interrupt(); + join(); +} + +void +ThreadImpl::run() +{ + if (_cpu_category.has_value()) { + auto usage = vespalib::CpuUsage::use(_cpu_category.value()); + _runnable.run(*this); + } else { + _runnable.run(*this); + } + _pool.unregisterThread(*this); + _joined = true; +} + +bool +ThreadImpl::interrupted() const +{ + return _interrupted.load(std::memory_order_relaxed); +} + +bool +ThreadImpl::joined() const +{ + return _joined; +} + +void +ThreadImpl::interrupt() +{ + _interrupted.store(true, std::memory_order_relaxed); + _thread.stop(); +} + +void +ThreadImpl::join() +{ + _thread.join(); +} + +void +ThreadImpl::registerTick(CycleType cycleType, vespalib::steady_time now) +{ + if (now.time_since_epoch() == vespalib::duration::zero()) now = _pool.getClock().getMonotonicTime(); + ThreadTickData data(getTickData()); + vespalib::steady_clock::time_point previousTick = data._lastTick; + data._lastTick = now; + data._lastTickType = cycleType; + setTickData(data); + + if (data._lastTick.time_since_epoch() == vespalib::duration::zero()) { return; } + + if (previousTick > now) { + LOGBP(warning, "Thread is registering tick at time %" PRIu64 ", but " + "last time it registered a tick, the time was %" PRIu64 + ". Assuming clock has been adjusted backwards", + vespalib::count_ms(now.time_since_epoch()), vespalib::count_ms(previousTick.time_since_epoch())); + return; + } + vespalib::duration cycleTime = now - previousTick; + if (cycleType == WAIT_CYCLE) { + data._maxWaitTimeSeen = std::max(data._maxWaitTimeSeen, cycleTime); + } else { + data._maxProcessingTimeSeen = std::max(data._maxProcessingTimeSeen, cycleTime); + } +} + +ThreadTickData +ThreadImpl::getTickData() const +{ + return _tickData[load_acquire(_tickDataPtr)].loadRelaxed(); +} + +void +ThreadImpl::setTickData(const ThreadTickData& tickData) +{ + uint32_t nextData = (load_relaxed(_tickDataPtr) + 1) % _tickData.size(); + _tickData[nextData].storeRelaxed(tickData); + store_release(_tickDataPtr, nextData); +} + +ThreadTickData +ThreadImpl::AtomicThreadTickData::loadRelaxed() const noexcept +{ + ThreadTickData result; + constexpr auto relaxed = std::memory_order_relaxed; + result._lastTickType = _lastTickType.load(relaxed); + result._lastTick = _lastTick.load(relaxed); + result._maxProcessingTimeSeen = _maxProcessingTimeSeen.load(relaxed); + result._maxWaitTimeSeen = _maxWaitTimeSeen.load(relaxed); + return result; +} + +void +ThreadImpl::AtomicThreadTickData::storeRelaxed(const ThreadTickData& newState) noexcept +{ + constexpr auto relaxed = std::memory_order_relaxed; + _lastTickType.store(newState._lastTickType, relaxed); + _lastTick.store(newState._lastTick, relaxed); + _maxProcessingTimeSeen.store(newState._maxProcessingTimeSeen, relaxed); + _maxWaitTimeSeen.store(newState._maxWaitTimeSeen, relaxed); +} + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h new file mode 100644 index 00000000000..46a9412bf67 --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h @@ -0,0 +1,83 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/storageframework/generic/thread/threadpool.h> +#include <vespa/vespalib/util/cpu_usage.h> +#include <vespa/vespalib/util/document_runnable.h> +#include <array> +#include <atomic> +#include <optional> + +namespace storage::framework::defaultimplementation { + +struct ThreadPoolImpl; + +class ThreadImpl : public Thread +{ + struct BackendThread : public document::Runnable { + ThreadImpl& _impl; + BackendThread(ThreadImpl& impl) : _impl(impl) {} + void run() override { _impl.run(); } + }; + + /** + * Internal data race free implementation of tick data that maps to and + * from ThreadTickData. We hide the atomicity of this since atomic vars + * are not CopyConstructible and thus would impose unnecessary limitations + * on code using it. + */ + struct AtomicThreadTickData { + AtomicThreadTickData() noexcept + : _lastTickType(), + _lastTick(vespalib::steady_time(vespalib::duration::zero())), + _maxProcessingTimeSeen(), + _maxWaitTimeSeen() + {} + std::atomic<CycleType> _lastTickType; + std::atomic<vespalib::steady_time> _lastTick; + std::atomic<vespalib::duration> _maxProcessingTimeSeen; + std::atomic<vespalib::duration> _maxWaitTimeSeen; + // struct stores and loads are both data race free with relaxed + // memory semantics. This means it's possible to observe stale/partial + // state in a case with concurrent readers/writers. + ThreadTickData loadRelaxed() const noexcept; + void storeRelaxed(const ThreadTickData& newState) noexcept; + }; + + ThreadPoolImpl& _pool; + Runnable& _runnable; + ThreadProperties _properties; + std::array<AtomicThreadTickData, 3> _tickData; + std::atomic<uint32_t> _tickDataPtr; + std::atomic<bool> _interrupted; + bool _joined; + BackendThread _thread; + std::optional<vespalib::CpuUsage::Category> _cpu_category; + + void run(); + +public: + ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, vespalib::duration waitTime, + vespalib::duration maxProcessTime, int ticksBeforeWait, + std::optional<vespalib::CpuUsage::Category> cpu_category); + ~ThreadImpl(); + + bool interrupted() const override; + bool joined() const override; + void interrupt() override; + void join() override; + void registerTick(CycleType, vespalib::steady_time) override; + vespalib::duration getWaitTime() const override { + return _properties.getWaitTime(); + } + int getTicksBeforeWait() const override { + return _properties.getTicksBeforeWait(); + } + + void setTickData(const ThreadTickData&); + ThreadTickData getTickData() const; + const ThreadProperties& getProperties() const { return _properties; } +}; + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp new file mode 100644 index 00000000000..7711eddf51c --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp @@ -0,0 +1,86 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "threadpoolimpl.h" +#include "threadimpl.h" +#include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/size_literals.h> +#include <thread> +#include <vespa/log/log.h> +LOG_SETUP(".storageframework.thread_pool_impl"); + +using namespace std::chrono_literals; +using vespalib::IllegalStateException; + +namespace storage::framework::defaultimplementation { + +ThreadPoolImpl::ThreadPoolImpl(Clock& clock) + : _backendThreadPool(512_Ki), + _clock(clock), + _stopping(false) +{ } + +ThreadPoolImpl::~ThreadPoolImpl() +{ + { + std::lock_guard lock(_threadVectorLock); + _stopping = true; + for (ThreadImpl * thread : _threads) { + thread->interrupt(); + } + for (ThreadImpl * thread : _threads) { + thread->join(); + } + } + for (uint32_t i=0; true; i+=10) { + { + std::lock_guard lock(_threadVectorLock); + if (_threads.empty()) break; + } + if (i > 1000) { + fprintf(stderr, "Failed to kill thread pool. Threads won't die. (And if allowing thread pool object" + " to be deleted this will create a segfault later)\n"); + LOG_ABORT("should not be reached"); + } + std::this_thread::sleep_for(10ms); + } + _backendThreadPool.Close(); +} + +Thread::UP +ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, vespalib::duration waitTime, + vespalib::duration maxProcessTime, int ticksBeforeWait, + std::optional<vespalib::CpuUsage::Category> cpu_category) +{ + std::lock_guard lock(_threadVectorLock); + if (_stopping) { + throw IllegalStateException("Threadpool is stopping", VESPA_STRLOC); + } + auto thread = std::make_unique<ThreadImpl>(*this, runnable, id, waitTime, maxProcessTime, ticksBeforeWait, cpu_category); + _threads.push_back(thread.get()); + return thread; +} + +void +ThreadPoolImpl::visitThreads(ThreadVisitor& visitor) const +{ + std::lock_guard lock(_threadVectorLock); + for (const ThreadImpl * thread : _threads) { + visitor.visitThread(thread->getId(), thread->getProperties(), thread->getTickData()); + } +} + +void +ThreadPoolImpl::unregisterThread(ThreadImpl& t) +{ + std::lock_guard lock(_threadVectorLock); + std::vector<ThreadImpl*> threads; + threads.reserve(_threads.size()); + for (ThreadImpl * thread : _threads) { + if (thread != &t) { + threads.push_back(thread); + } + } + _threads.swap(threads); +} + +} diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h new file mode 100644 index 00000000000..d6053a2f128 --- /dev/null +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h @@ -0,0 +1,35 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/storageframework/generic/thread/threadpool.h> +#include <vespa/fastos/thread.h> + +namespace storage::framework::defaultimplementation { + +class ThreadImpl; + +struct ThreadPoolImpl : public ThreadPool +{ + FastOS_ThreadPool _backendThreadPool; + std::vector<ThreadImpl*> _threads; + mutable std::mutex _threadVectorLock; + Clock & _clock; + bool _stopping; + +public: + ThreadPoolImpl(Clock&); + ~ThreadPoolImpl() override; + + Thread::UP startThread(Runnable&, vespalib::stringref id, vespalib::duration waitTime, + vespalib::duration maxProcessTime, int ticksBeforeWait, + std::optional<vespalib::CpuUsage::Category> cpu_category) override; + void visitThreads(ThreadVisitor&) const override; + + void registerThread(ThreadImpl&); + void unregisterThread(ThreadImpl&); + FastOS_ThreadPool& getThreadPool() { return _backendThreadPool; } + Clock& getClock() { return _clock; } +}; + +} diff --git a/storage/src/vespa/storageframework/generic/.gitignore b/storage/src/vespa/storageframework/generic/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/generic/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/generic/clock/.gitignore b/storage/src/vespa/storageframework/generic/clock/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/generic/clock/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/generic/clock/CMakeLists.txt b/storage/src/vespa/storageframework/generic/clock/CMakeLists.txt new file mode 100644 index 00000000000..c95860f26fe --- /dev/null +++ b/storage/src/vespa/storageframework/generic/clock/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_clock OBJECT + SOURCES + time.cpp + DEPENDS +) diff --git a/storage/src/vespa/storageframework/generic/clock/clock.h b/storage/src/vespa/storageframework/generic/clock/clock.h new file mode 100644 index 00000000000..c9b8f652bfe --- /dev/null +++ b/storage/src/vespa/storageframework/generic/clock/clock.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Clock + * \ingroup clock + * + * \brief Class used to attain current time. + * + * This class wraps how the time is retrieved. A common clock is useful in order + * to let unit tests fake time. It is also useful to have one point for all + * time calculations, such that one can possibly optimize if time retrieval + * becomes a bottle neck. + */ + +#pragma once + +#include "time.h" +#include <memory> + +namespace storage::framework { + +struct Clock { + using UP = std::unique_ptr<Clock>; + + virtual ~Clock() = default; + + virtual MicroSecTime getTimeInMicros() const = 0; + virtual MilliSecTime getTimeInMillis() const = 0; + virtual SecondTime getTimeInSeconds() const = 0; + + // Time point resolution is intentionally not defined here. + virtual MonotonicTimePoint getMonotonicTime() const = 0; +}; + +} diff --git a/storage/src/vespa/storageframework/generic/clock/time.cpp b/storage/src/vespa/storageframework/generic/clock/time.cpp new file mode 100644 index 00000000000..7bf3fca3835 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/clock/time.cpp @@ -0,0 +1,92 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "time.hpp" +#include "clock.h" +#include <iomanip> +#include <vector> +#include <cassert> +#include <sstream> + +namespace storage { +namespace framework { + +namespace { + void detectUnit(uint64_t& val, const char* unit, uint64_t size, + std::vector<std::pair<uint64_t, vespalib::string> >& units) { + if (val / size > 0) { + uint64_t value = val / size; + vespalib::string unitname = unit; + if (value != 1) unitname += "s"; + units.push_back(std::make_pair(value, unitname)); + val -= value * size; + } + } +} + +vespalib::string +getTimeString(uint64_t microSecondTime, TimeFormat format) +{ + // Rewrite to use other type of stream later if needed for performance + std::ostringstream ost; + if (format & DIFFERENCE_ALL) { + std::vector<std::pair<uint64_t, vespalib::string> > vals; + detectUnit(microSecondTime, "day", 24 * 60 * 60 * 1000 * 1000ull, vals); + detectUnit(microSecondTime, "hour", 60 * 60 * 1000 * 1000ull, vals); + detectUnit(microSecondTime, "minute", 60 * 1000 * 1000, vals); + detectUnit(microSecondTime, "second", 1000 * 1000, vals); + if (format & DIFFERENCE_WITH_MICROS) { + detectUnit(microSecondTime, "microsecond", 1, vals); + if (vals.empty()) { ost << "0 microseconds"; } + } else { + if (vals.empty()) { ost << "0 seconds"; } + } + if (vals.empty()) { + return vespalib::string(ost.str().c_str()); + } + ost << vals[0].first << " " << vals[0].second; + for (uint32_t i=1; i<vals.size(); ++i) { + if (i + 1 >= vals.size()) { + ost << " and "; + } else { + ost << ", "; + } + ost << vals[i].first << " " << vals[i].second; + } + return vespalib::string(ost.str().c_str()); + } + time_t secondTime = microSecondTime / 1000000; + struct tm datestruct; + struct tm* datestructptr = gmtime_r(&secondTime, &datestruct); + assert(datestructptr); + (void) datestructptr; + ost << std::setfill('0') + << std::setw(4) << (datestruct.tm_year + 1900) + << '-' << std::setw(2) << (datestruct.tm_mon + 1) + << '-' << std::setw(2) << datestruct.tm_mday + << ' ' << std::setw(2) << datestruct.tm_hour + << ':' << std::setw(2) << datestruct.tm_min + << ':' << std::setw(2) << datestruct.tm_sec; + uint64_t micros = microSecondTime % 1000000; + if (format == DATETIME_WITH_MILLIS) { + ost << '.' << std::setw(3) << (micros / 1000); + } else if (format == DATETIME_WITH_MICROS) { + ost << '.' << std::setw(6) << micros; + } + return vespalib::string(ost.str().c_str()); +} + +uint64_t +getRawMicroTime(const Clock& clock) +{ + return clock.getTimeInMicros().getTime(); +} + +template std::ostream& operator<< <MicroSecTime, 1>(std::ostream&, const Time<MicroSecTime, 1> &); +template std::ostream& operator<< <MilliSecTime, 1000>(std::ostream&, const Time<MilliSecTime, 1000> &); +template std::ostream& operator<< <SecondTime, 1000000>(std::ostream&, const Time<SecondTime, 1000000> &); + +template vespalib::asciistream& operator<< <MicroSecTime, 1>(vespalib::asciistream &, const Time<MicroSecTime, 1> &); +template vespalib::asciistream& operator<< <MilliSecTime, 1000>(vespalib::asciistream &, const Time<MilliSecTime, 1000> &); + +} // framework +} // storage diff --git a/storage/src/vespa/storageframework/generic/clock/time.h b/storage/src/vespa/storageframework/generic/clock/time.h new file mode 100644 index 00000000000..9140ee67332 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/clock/time.h @@ -0,0 +1,173 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/operators.hpp> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/util/time.h> + +namespace vespalib { + class asciistream; +} + +namespace storage::framework { + +using MonotonicTimePoint = std::chrono::steady_clock::time_point; +using MonotonicDuration = std::chrono::steady_clock::duration; + +struct Clock; + +enum TimeFormat { + DATETIME = 0x01, // 2010-04-26 19:23:03 + DATETIME_WITH_MILLIS = 0x02, // 2010-04-26 19:23:03.001 + DATETIME_WITH_MICROS = 0x04, // 2010-04-26 19:23:03.001023 + DATETIME_ALL = 0x07, + DIFFERENCE = 0x10, // 1 day, 4 hours, 43 minutes and 3 seconds + DIFFERENCE_WITH_MICROS = 0x20, // 1 day, 4 hours, 43 minutes, 3 seconds and 123123 microseconds + DIFFERENCE_ALL = 0x30 +}; + +/** + * Utility function used by Time instances (to avoid implementation in + * header file). + */ +vespalib::string getTimeString(uint64_t microSecondTime, TimeFormat format); + +// TODO deprecate framework time point and duration classes in favor of +// using std::chrono. + +// As this class can't include clock, this utility function can be used in +// header implementation to get actual time. +uint64_t getRawMicroTime(const Clock&); + +/** + * Class containing common functionality for the various time instances. Try to + * make time instances as easy to use as possible, without creating risk of + * automatic conversion between time types. + */ +template<typename Type, int MicrosPerUnit> +class Time : public boost::operators<Type> +{ + uint64_t _time; // time_t may be signed. Negative timestamps is just a + // source for bugs. Enforce unsigned. + +protected: + Time(uint64_t t) : _time(t) {} + +public: + uint64_t getTime() const { return _time; } + void setTime(uint64_t t) { _time = t; } + bool isSet() const { return (_time != 0); } + + Type& operator-=(const Type& o) + { _time -= o._time; return static_cast<Type&>(*this); } + Type& operator+=(const Type& o) + { _time += o._time; return static_cast<Type&>(*this); } + bool operator<(const Type& o) const { return (_time < o._time); } + bool operator==(const Type& o) const { return (_time == o._time); } + Type& operator++() { ++_time; return static_cast<Type&>(*this); } + Type& operator--() { --_time; return *this; } + + Type getDiff(const Type& o) const { + return Type(_time > o._time ? _time - o._time : o._time - _time); + } + + vespalib::string toString(TimeFormat timeFormat = DATETIME) const { + return getTimeString(_time * MicrosPerUnit, timeFormat); + } + + static Type max() { return Type(std::numeric_limits<uint64_t>().max()); } + static Type min() { return Type(0); } + +}; + +template<typename Type, typename Number> +Type& operator/(Type& type, Number n) { + type.setTime(type.getTime() / n); + return type; +} + +template<typename Type, typename Number> +Type& operator*(Type& type, Number n) { + type.setTime(type.getTime() * n); + return type; +} + +template<typename Type, int MPU> +std::ostream& operator<<(std::ostream& out, const Time<Type, MPU>& t); + +template<typename Type, int MPU> +vespalib::asciistream& operator<<(vespalib::asciistream& out, const Time<Type, MPU>& t); + +struct MicroSecTime; +struct MilliSecTime; + +/** + * \class storage::framework::SecondTime + * \ingroup clock + * + * \brief Wrapper class for a timestamp in seconds. + * + * To prevent errors where one passes time in one granularity to a function + * requiring time in another granularity. This little wrapper class exist to + * make sure that will conflict in types + */ +struct SecondTime : public Time<SecondTime, 1000000> { + explicit SecondTime(uint64_t t = 0) : Time<SecondTime, 1000000>(t) {} + explicit SecondTime(const Clock& clock) + : Time<SecondTime, 1000000>(getRawMicroTime(clock) / 1000000) {} + + MilliSecTime getMillis() const; + MicroSecTime getMicros() const; +}; + +/** + * \class storage::framework::MilliSecTime + * \ingroup clock + * + * \brief Wrapper class for a timestamp in milliseconds. + * + * To prevent errors where one passes time in one granularity to a function + * requiring time in another granularity. This little wrapper class exist to + * make sure that will conflict in types + */ +struct MilliSecTime : public Time<MilliSecTime, 1000> { + explicit MilliSecTime(uint64_t t = 0) : Time<MilliSecTime, 1000>(t) {} + explicit MilliSecTime(const Clock& clock) + : Time<MilliSecTime, 1000>(getRawMicroTime(clock) / 1000) {} + + SecondTime getSeconds() const { return SecondTime(getTime() / 1000); } + MicroSecTime getMicros() const; +}; + +/** + * \class storage::framework::MicroSecTime + * \ingroup clock + * + * \brief Wrapper class for a timestamp in seconds. + * + * To prevent errors where one passes time in one granularity to a function + * requiring time in another granularity. This little wrapper class exist to + * make sure that will conflict in types + */ +struct MicroSecTime : public Time<MicroSecTime, 1> { + explicit MicroSecTime(uint64_t t = 0) : Time<MicroSecTime, 1>(t) {} + explicit MicroSecTime(const Clock& clock) + : Time<MicroSecTime, 1>(getRawMicroTime(clock)) {} + + MilliSecTime getMillis() const { return MilliSecTime(getTime() / 1000); } + SecondTime getSeconds() const { return SecondTime(getTime() / 1000000); } +}; + +inline MilliSecTime SecondTime::getMillis() const { + return MilliSecTime(getTime() * 1000); +} + +inline MicroSecTime SecondTime::getMicros() const { + return MicroSecTime(getTime() * 1000 * 1000); +} + +inline MicroSecTime MilliSecTime::getMicros() const { + return MicroSecTime(getTime() * 1000); +} + +} diff --git a/storage/src/vespa/storageframework/generic/clock/time.hpp b/storage/src/vespa/storageframework/generic/clock/time.hpp new file mode 100644 index 00000000000..2cc3fb7be9d --- /dev/null +++ b/storage/src/vespa/storageframework/generic/clock/time.hpp @@ -0,0 +1,21 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "time.h" +#include <vespa/vespalib/stllike/asciistream.h> + +namespace storage { +namespace framework { + +template<typename Type, int MPU> +std::ostream& operator<<(std::ostream& out, const Time<Type, MPU>& t) { + return out << t.getTime(); +} + +template<typename Type, int MPU> +vespalib::asciistream& operator<<(vespalib::asciistream& out, const Time<Type, MPU>& t) { + return out << t.getTime(); +} + +} // framework +} // storage diff --git a/storage/src/vespa/storageframework/generic/clock/timer.h b/storage/src/vespa/storageframework/generic/clock/timer.h new file mode 100644 index 00000000000..2642a6decc4 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/clock/timer.h @@ -0,0 +1,39 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Timer + * \ingroup clock + * + * \brief Class used to measure time differences. + */ + +#pragma once + +#include "clock.h" + +namespace storage::framework { + +class MilliSecTimer { + const Clock* _clock; + MonotonicTimePoint _startTime; + +public: + MilliSecTimer(const Clock& clock) + : _clock(&clock), _startTime(_clock->getMonotonicTime()) {} + + // Copy construction makes the most sense when creating a timer that is + // intended to inherit another timer's start time point, without incurring + // the cost of an initial clock sampling. + MilliSecTimer(const MilliSecTimer&) = default; + MilliSecTimer& operator=(const MilliSecTimer&) = default; + + MonotonicDuration getElapsedTime() const { + return _clock->getMonotonicTime() - _startTime; + } + + double getElapsedTimeAsDouble() const { + using ToDuration = std::chrono::duration<double, std::milli>; + return std::chrono::duration_cast<ToDuration>(getElapsedTime()).count(); + } +}; + +} diff --git a/storage/src/vespa/storageframework/generic/component/.gitignore b/storage/src/vespa/storageframework/generic/component/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/generic/component/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/generic/component/CMakeLists.txt b/storage/src/vespa/storageframework/generic/component/CMakeLists.txt new file mode 100644 index 00000000000..69e639b89db --- /dev/null +++ b/storage/src/vespa/storageframework/generic/component/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_component OBJECT + SOURCES + component.cpp + DEPENDS +) diff --git a/storage/src/vespa/storageframework/generic/component/component.cpp b/storage/src/vespa/storageframework/generic/component/component.cpp new file mode 100644 index 00000000000..f45e1a0b839 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/component/component.cpp @@ -0,0 +1,105 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "component.h" +#include "componentregister.h" +#include <vespa/storageframework/generic/metric/metricregistrator.h> +#include <vespa/storageframework/generic/thread/threadpool.h> +#include <cassert> + +namespace storage::framework { + +void +Component::open() +{ + if (_listener != 0) _listener->onOpen(); +} + +void +Component::close() +{ + if (_listener != 0) _listener->onClose(); +} + +Component::Component(ComponentRegister& cr, vespalib::stringref name) + : _componentRegister(&cr), + _name(name), + _status(nullptr), + _metric(nullptr), + _threadPool(nullptr), + _metricReg(nullptr), + _clock(nullptr), + _listener(nullptr) +{ + cr.registerComponent(*this); +} + +Component::~Component() = default; + +void +Component::registerComponentStateListener(ComponentStateListener& l) +{ + assert(_listener == nullptr); + _listener = &l; +} + +void +Component::registerStatusPage(const StatusReporter& sr) +{ + assert(_status == nullptr); + _status = &sr; +} + +void +Component::registerMetric(metrics::Metric& m) +{ + assert(_metric == nullptr); + _metric = &m; + if (_metricReg != nullptr) { + _metricReg->registerMetric(m); + } +} + +void +Component::registerMetricUpdateHook(MetricUpdateHook& hook, SecondTime period) +{ + assert(_metricUpdateHook.first == 0); + _metricUpdateHook = std::make_pair(&hook, period); + if (_metricReg != nullptr) { + _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second); + } +} + +void +Component::setMetricRegistrator(MetricRegistrator& mr) { + _metricReg = &mr; + if (_metricUpdateHook.first != 0) { + _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second); + } + if (_metric != nullptr) { + _metricReg->registerMetric(*_metric); + } +} + +ThreadPool& +Component::getThreadPool() const +{ + assert(_threadPool != nullptr); + return *_threadPool; +} + +// Helper functions for components wanting to start a single thread. +Thread::UP +Component::startThread(Runnable& runnable, vespalib::duration waitTime, vespalib::duration maxProcessTime, + int ticksBeforeWait, std::optional<vespalib::CpuUsage::Category> cpu_category) +{ + return getThreadPool().startThread(runnable, getName(), waitTime, + maxProcessTime, ticksBeforeWait, cpu_category); +} + +void +Component::requestShutdown(vespalib::stringref reason) +{ + _componentRegister->requestShutdown(reason); +} + +} diff --git a/storage/src/vespa/storageframework/generic/component/component.h b/storage/src/vespa/storageframework/generic/component/component.h new file mode 100644 index 00000000000..09af1d276f3 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/component/component.h @@ -0,0 +1,203 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Component + * \ingroup component + * + * \brief Application component class + * + * The component class is used to give a component of an application a set of + * generic tools without depending on the implementation of these. + * + * This class should not depend on the actual implementation of what it serves. + * Neither in the header file nor the object file. Implementations will be + * provided by the application server. + * + * Services given should all be generic features. Application specific stuff + * we should handle in another way. The types of services offered are split in + * two. + * + * 1. Services component implementation has that it registers in the component + * such that application server can access this. Metrics and status + * reporting are two such services. + * 2. Services provided through application server, that the application + * server will inject into the component before it is opened for use. + * Clock, thread pool and memory management are examples of such services. + * + * The services offered with a short summary of what they provide are as + * follows: + * + * - Status reporters can register themselves as a reported in the component. + * A status server, for instance serving status information through a web + * server can thus fetch status pages wanted by clients and serve them. + * Status reporters thus don't need to know how status information is used. + * + * - A metric set can be registered, with a path for where in the application + * metric set it should exist. This way, the components do not have to know + * about metric management and the implementation of the metric manager. + * + * - A metric update hook can be registered. This will be called by the metric + * implementation at regular intervals or just before snapshotting/reporting. + * + * - A clock object is given. Using a common clock component all over the + * application makes us able to fake the clock in testing environments. + * Fetching current time is also a somewhat expensive operations we might + * do often, so having this common object to fetch it, we can easily + * optimize clock fetching as we see fit later. + * + * - A thread pool is given. This makes us able to use a thread pool. + * (Allthough currently we don't really need a thread pool, as threads + * typically live for the whole lifetime of the server. But currently we are + * forced to use a thread pool due to fastos.) Another feature of this is + * that the thread interface has built in information needed to detect + * deadlocks and report status about thread behavior, such that deadlock + * detecting and thread status can be shown without the threads themselves + * depending on how this is done. + * + * - A memory manager may also be provided, allowing components to request + * memory from a global pool, in order to let the application prioritize + * where to use memory. Again, this removes the dependency of how it is + * actually implemented to the components using it. + * + * Currently it is assumed that components are set up at application + * initialization time, and that they live as long as the application. Thus no + * unregister functionality is provided. Services that use registered status + * reporters or metric sets will shut down before the component is deleted, + * such that the component can be safely deleted without any unregistering + * needed. + */ +#pragma once + +#include "managedcomponent.h" +#include <vespa/storageframework/generic/thread/runnable.h> +#include <vespa/storageframework/generic/thread/thread.h> +#include <vespa/storageframework/generic/clock/clock.h> +#include <vespa/vespalib/util/cpu_usage.h> +#include <atomic> +#include <optional> + +namespace storage::framework { + +struct ComponentRegister; + +struct ComponentStateListener { + virtual ~ComponentStateListener() = default; + + virtual void onOpen() {} + virtual void onClose() {} +}; + +class Component : private ManagedComponent +{ + ComponentRegister* _componentRegister; + vespalib::string _name; + const StatusReporter* _status; + metrics::Metric* _metric; + ThreadPool* _threadPool; + MetricRegistrator* _metricReg; + std::pair<MetricUpdateHook*, SecondTime> _metricUpdateHook; + const Clock* _clock; + ComponentStateListener* _listener; + std::atomic<UpgradeFlags> _upgradeFlag; + + UpgradeFlags loadUpgradeFlag() const { + return _upgradeFlag.load(std::memory_order_relaxed); + } + + // ManagedComponent implementation + metrics::Metric* getMetric() override { return _metric; } + std::pair<MetricUpdateHook*, SecondTime> getMetricUpdateHook() override { return _metricUpdateHook; } + const StatusReporter* getStatusReporter() override { return _status; } + void setMetricRegistrator(MetricRegistrator& mr) override; + void setClock(Clock& c) override { _clock = &c; } + void setThreadPool(ThreadPool& tp) override { _threadPool = &tp; } + void setUpgradeFlag(UpgradeFlags flag) override { + _upgradeFlag.store(flag, std::memory_order_relaxed); + } + void open() override; + void close() override; + +public: + using UP = std::unique_ptr<Component>; + + Component(ComponentRegister&, vespalib::stringref name); + virtual ~Component(); + + /** + * Register a component state listener, getting callbacks when components + * are started and stopped. An application might want to create all + * components before starting to do it's work. And it might stop doing work + * before starting to remove components. Using this listener, components + * may get callbacks in order to do some initialization after all components + * are set up, and to do some cleanups before other components are being + * removed. + */ + void registerComponentStateListener(ComponentStateListener&); + /** + * Register a status page, which might be visible to others through a + * component showing status of components. Only one status page can be + * registered per component. Use URI parameters in order to distinguish + * multiple pages. + */ + void registerStatusPage(const StatusReporter&); + + /** + * Register a metric (typically a metric set) used by this component. Only + * one metric set can be registered per component. Register a metric set in + * order to register many metrics within the component. + */ + void registerMetric(metrics::Metric&); + + /** + * Register a metric update hook. Only one per component. Note that the + * update hook will only be called if there actually is a metric mananger + * component registered in the application. + */ + void registerMetricUpdateHook(MetricUpdateHook&, SecondTime period); + + /** Get the name of the component. Must be a unique name. */ + const vespalib::string& getName() const override { return _name; } + + /** + * Get the thread pool for this application. Note that this call will fail + * before the application has registered a threadpool. Applications are + * encouraged to register a threadpool before adding components to avoid + * needing components to wait before accessing threadpool. + */ + ThreadPool& getThreadPool() const; + + /** + * Get the clock used in this application. This function will fail before + * the application has registered a clock implementation. Applications are + * encourated to register a clock implementation before adding components to + * avoid needing components to delay using it. + */ + const Clock& getClock() const { return *_clock; } + + /** + * Helper functions for components wanting to start a single thread. + * If max wait time is not set, we assume process time includes waiting. + * If max process time is not set, deadlock detector cannot detect deadlocks + * in this thread. (Thus one is not required to call registerTick()) + */ + Thread::UP startThread(Runnable&, + vespalib::duration maxProcessTime = vespalib::duration::zero(), + vespalib::duration waitTime = vespalib::duration::zero(), + int ticksBeforeWait = 1, + std::optional<vespalib::CpuUsage::Category> cpu_category = std::nullopt); + + // Check upgrade flag settings. Note that this flag may change at any time. + // Thus the results of these functions should not be cached. + bool isUpgradingToMajorVersion() const + { return (loadUpgradeFlag() == UPGRADING_TO_MAJOR_VERSION); } + bool isUpgradingToMinorVersion() const + { return (loadUpgradeFlag() == UPGRADING_TO_MINOR_VERSION); } + bool isUpgradingFromMajorVersion() const + { return (loadUpgradeFlag() == UPGRADING_FROM_MAJOR_VERSION); } + bool isUpgradingFromMinorVersion() const + { return (loadUpgradeFlag() == UPGRADING_FROM_MINOR_VERSION); } + + void requestShutdown(vespalib::stringref reason); + +}; + +} diff --git a/storage/src/vespa/storageframework/generic/component/componentregister.h b/storage/src/vespa/storageframework/generic/component/componentregister.h new file mode 100644 index 00000000000..da6aa3b51de --- /dev/null +++ b/storage/src/vespa/storageframework/generic/component/componentregister.h @@ -0,0 +1,27 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::ComponentRegister + * \ingroup component + * + * \brief Application server implements this to get overview of all components. + * + * By implementing this class, the application server will get all the + * components it needs to manage using an interface containing just what it + * needs to minimize dependencies. + */ +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +namespace storage::framework { + +struct ManagedComponent; + +struct ComponentRegister { + virtual ~ComponentRegister() {} + + virtual void registerComponent(ManagedComponent&) = 0; + virtual void requestShutdown(vespalib::stringref reason) = 0; +}; + +} diff --git a/storage/src/vespa/storageframework/generic/component/managedcomponent.h b/storage/src/vespa/storageframework/generic/component/managedcomponent.h new file mode 100644 index 00000000000..4cd142420cd --- /dev/null +++ b/storage/src/vespa/storageframework/generic/component/managedcomponent.h @@ -0,0 +1,75 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::ManagedComponent + * \ingroup component + * + * \brief Interface to expose to manager of components. + * + * As to not make the functions needed by the component manager exposed to the + * component implementation, and vice versa, this interface exist to be what + * the manager is interested in. That way, component implementation can + * implement that privately, but expose it to the component register. + */ +#pragma once + +#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/vespalib/stllike/string.h> + +namespace metrics { + class Metric; +} + +namespace storage::framework { + +struct StatusReporter; +struct MetricRegistrator; +struct MetricUpdateHook; +struct ThreadPool; +struct Clock; + +/** + * The upgrade flags can be used to add forward/backward compatability. In most + * cases, we can hopefully ignore this as next version is compatible. In some + * cases the new version might need to avoid doing requests the old version + * can't handle. In rare cases, the older version might have gotten some forward + * compatability code added which it might need to activate during an upgrade. + * + * Note that these flags must be set in an application when an upgrade requiring + * this is being performed. Upgrade docs should specify this if needed. + */ +enum UpgradeFlags { + // Indicates we're either not upgrading, or we're upgrading compatible + // versions so we doesn't need any special handling. + NO_UPGRADE_SPECIAL_HANDLING_ACTIVE, + // The cluster is being upgraded to this major version. We might need to + // send old type of messages to make older nodes understand what we send + UPGRADING_TO_MAJOR_VERSION, + // The cluster is being upgraded to this minor version. We might need to + // send old type of messages to make older nodes understand what we send + UPGRADING_TO_MINOR_VERSION, + // The cluster is being upgraded to the next major version. We might + // need to refrain from using functionality removed in the new version. + UPGRADING_FROM_MAJOR_VERSION, + // The cluster is being upgraded to the next minor version. We might + // need to refrain from using functionality removed in the new version. + UPGRADING_FROM_MINOR_VERSION +}; + +struct ManagedComponent { + virtual ~ManagedComponent() {} + + virtual const vespalib::string& getName() const = 0; + virtual metrics::Metric* getMetric() = 0; + virtual std::pair<MetricUpdateHook*, SecondTime> getMetricUpdateHook() = 0; + virtual const StatusReporter* getStatusReporter() = 0; + + virtual void setMetricRegistrator(MetricRegistrator&) = 0; + virtual void setClock(Clock&) = 0; + virtual void setThreadPool(ThreadPool&) = 0; + virtual void setUpgradeFlag(UpgradeFlags flag) = 0; + virtual void open() = 0; + virtual void close() = 0; + +}; + +} diff --git a/storage/src/vespa/storageframework/generic/metric/.gitignore b/storage/src/vespa/storageframework/generic/metric/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/generic/metric/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/generic/metric/CMakeLists.txt b/storage/src/vespa/storageframework/generic/metric/CMakeLists.txt new file mode 100644 index 00000000000..5881159862e --- /dev/null +++ b/storage/src/vespa/storageframework/generic/metric/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_metric INTERFACE + SOURCES + INSTALL lib64 + DEPENDS +) diff --git a/storage/src/vespa/storageframework/generic/metric/metricregistrator.h b/storage/src/vespa/storageframework/generic/metric/metricregistrator.h new file mode 100644 index 00000000000..1129bf29e45 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/metric/metricregistrator.h @@ -0,0 +1,31 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::MetricRegistrator + * \ingroup metric + * + * \brief Interface used to register a metric in the backend. + * + * To avoid needing the framework module to depend on the metric system (in + * case any users don't use it), this class exist to remove this dependency. + */ +#pragma once + +#include <vespa/storageframework/generic/clock/time.h> + +namespace metrics { + class Metric; +} + +namespace storage::framework { + +struct MetricUpdateHook; + +struct MetricRegistrator { + virtual ~MetricRegistrator() = default; + + virtual void registerMetric(metrics::Metric&) = 0; + virtual void registerUpdateHook(vespalib::stringref name, MetricUpdateHook& hook, SecondTime period) = 0; +}; + +} + diff --git a/storage/src/vespa/storageframework/generic/metric/metricupdatehook.h b/storage/src/vespa/storageframework/generic/metric/metricupdatehook.h new file mode 100644 index 00000000000..b45713cd0ce --- /dev/null +++ b/storage/src/vespa/storageframework/generic/metric/metricupdatehook.h @@ -0,0 +1,23 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::MetricUpdateHook + * \ingroup metric + * + * \brief Implement to get callbacks to update metrics periodically or just before reports/snapshots. + */ +#pragma once + +#include <mutex> + +namespace metrics { class MetricLockGuard; } + +namespace storage::framework { + +struct MetricUpdateHook { + using MetricLockGuard = metrics::MetricLockGuard; + virtual ~MetricUpdateHook() = default; + + virtual void updateMetrics(const MetricLockGuard &) = 0; +}; + +} diff --git a/storage/src/vespa/storageframework/generic/status/.gitignore b/storage/src/vespa/storageframework/generic/status/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/generic/status/CMakeLists.txt b/storage/src/vespa/storageframework/generic/status/CMakeLists.txt new file mode 100644 index 00000000000..a629e632b78 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_status OBJECT + SOURCES + statusreporter.cpp + htmlstatusreporter.cpp + xmlstatusreporter.cpp + httpurlpath.cpp + DEPENDS +) diff --git a/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp b/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp new file mode 100644 index 00000000000..9b7c4919403 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.cpp @@ -0,0 +1,52 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "htmlstatusreporter.h" + +namespace storage::framework { + +HtmlStatusReporter::HtmlStatusReporter(vespalib::stringref id, + vespalib::stringref name) + : StatusReporter(id, name) +{ +} + +HtmlStatusReporter::~HtmlStatusReporter() = default; + +void +HtmlStatusReporter::reportHtmlHeader(std::ostream& out, + const HttpUrlPath& path) const +{ + out << "<html>\n" + << "<head>\n" + << " <title>" << getName() << "</title>\n"; + reportHtmlHeaderAdditions(out, path); + out << "</head>\n" + << "<body>\n" + << " <h1>" << getName() << "</h1>\n"; +} + +void +HtmlStatusReporter::reportHtmlFooter(std::ostream& out, + const HttpUrlPath&) const +{ + out << "</body>\n</html>\n"; +} + +vespalib::string +HtmlStatusReporter::getReportContentType(const HttpUrlPath&) const +{ + return "text/html"; +} + +bool +HtmlStatusReporter::reportStatus(std::ostream& out, + const HttpUrlPath& path) const +{ + if (!isValidStatusRequest()) return false; + reportHtmlHeader(out, path); + reportHtmlStatus(out, path); + reportHtmlFooter(out, path); + return true; +} + +} diff --git a/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.h b/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.h new file mode 100644 index 00000000000..4ffba20a3fa --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/htmlstatusreporter.h @@ -0,0 +1,64 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::HtmlStatusReporter + * \ingroup component + * + * \brief Specialization of StatusReporter for reporters of HTML data. + * + * To avoid code duplication, and to let all HTML status reporters be able + * to look consistently, this specialization exist to have a common place to + * implement common HTML parts printed. + * + * Note: If you want to write HTTP from a status reporter that can also write + * other types of content, best practise is to instantiate the + * PartlyHtmlStatusReporter to print the HTML headers and footers. + */ + +#pragma once + +#include "statusreporter.h" + +namespace storage::framework { + +struct HtmlStatusReporter : public StatusReporter { + HtmlStatusReporter(vespalib::stringref id, vespalib::stringref name); + virtual ~HtmlStatusReporter(); + + /** + * The default HTML header writer uses this function to allow page to add + * some code in the <head></head> part of the HTML, such as javascript + * functions. + */ + virtual void reportHtmlHeaderAdditions(std::ostream&, + const HttpUrlPath&) const {} + + /** + * Write a default HTML header. It writes the start of an HTML + * file, including a body statement and a header with component name. + */ + virtual void reportHtmlHeader(std::ostream&, const HttpUrlPath&) const; + + /** Overwrite to write the actual HTML content. */ + virtual void reportHtmlStatus(std::ostream&, const HttpUrlPath&) const = 0; + + /** Writes a default HTML footer. Includes closing the body tag. */ + virtual void reportHtmlFooter(std::ostream&, const HttpUrlPath&) const; + + // Implementation of StatusReporter interface + vespalib::string getReportContentType(const HttpUrlPath&) const override; + bool reportStatus(std::ostream&, const HttpUrlPath&) const override; +}; + +/** + * This class can be used if your status reporter only reports HTML in some + * instances. Then you can create an instance of this class in order to write + * the HTML headers and footers when needed. + */ +struct PartlyHtmlStatusReporter : public HtmlStatusReporter { + PartlyHtmlStatusReporter(const StatusReporter& main) + : HtmlStatusReporter(main.getId(), main.getName()) {} + + void reportHtmlStatus(std::ostream&, const HttpUrlPath&) const override {} +}; + +} diff --git a/storage/src/vespa/storageframework/generic/status/httpurlpath.cpp b/storage/src/vespa/storageframework/generic/status/httpurlpath.cpp new file mode 100644 index 00000000000..b55b6dee06e --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/httpurlpath.cpp @@ -0,0 +1,93 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "httpurlpath.h" +#include <vespa/vespalib/text/stringtokenizer.h> + +namespace storage::framework { + +HttpUrlPath::HttpUrlPath(const vespalib::string& urlpath) + : _path(), + _attributes(), + _serverSpec() +{ + init(urlpath); +} + +HttpUrlPath::HttpUrlPath(const vespalib::string& urlpath, + const vespalib::string& serverSpec) + : _path(), + _attributes(), + _serverSpec(serverSpec) +{ + init(urlpath); +} + +HttpUrlPath::HttpUrlPath(vespalib::string path, + std::map<vespalib::string, vespalib::string> attributes, + vespalib::string serverSpec) + : _path(std::move(path)), + _attributes(std::move(attributes)), + _serverSpec(std::move(serverSpec)) +{ +} + +HttpUrlPath::~HttpUrlPath() {} + +void +HttpUrlPath::init(const vespalib::string &urlpath) +{ + vespalib::string::size_type pos = urlpath.find('?'); + if (pos == vespalib::string::npos) { + _path = urlpath; + } else { + _path = urlpath.substr(0, pos); + vespalib::string sub(urlpath.substr(pos+1)); + vespalib::StringTokenizer tokenizer(sub, "&", ""); + for (uint32_t i=0, n=tokenizer.size(); i<n; ++i) { + const vespalib::string& s(tokenizer[i]); + pos = s.find('='); + if (pos == vespalib::string::npos) { + _attributes[s] = ""; + } else { + _attributes[s.substr(0,pos)] = s.substr(pos+1); + } + } + } +} + +bool +HttpUrlPath::hasAttribute(const vespalib::string& id) const +{ + return (_attributes.find(id) != _attributes.end()); +} + +vespalib::string +HttpUrlPath::getAttribute(const vespalib::string& id, + const vespalib::string& defaultValue) const +{ + std::map<vespalib::string, vespalib::string>::const_iterator it + = _attributes.find(id); + return (it == _attributes.end() ? defaultValue : it->second); +} + +void +HttpUrlPath::print(std::ostream& out, bool, const std::string&) const +{ + out << _path; + if (!_attributes.empty()) { + out << "?"; + size_t cnt = 0; + for (const auto &attr: _attributes) { + if (cnt++ > 0) { + out << "&"; + } + out << attr.first; + if (!attr.second.empty()) { + out << "="; + out << attr.second; + } + } + } +} + +} diff --git a/storage/src/vespa/storageframework/generic/status/httpurlpath.h b/storage/src/vespa/storageframework/generic/status/httpurlpath.h new file mode 100644 index 00000000000..4835365259f --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/httpurlpath.h @@ -0,0 +1,60 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Utility class to parse the url-path part of an HTTP URL. + * Used by status module. + */ + +#pragma once + +#include <vespa/vespalib/util/printable.h> +#include <vespa/vespalib/stllike/string.h> +#include <map> +#include <sstream> + +namespace storage::framework { + +class HttpUrlPath : public vespalib::Printable { + vespalib::string _path; + std::map<vespalib::string, vespalib::string> _attributes; + vespalib::string _serverSpec; // "host:port" + + void init(const vespalib::string &urlpath); + +public: + HttpUrlPath(const vespalib::string& urlpath); + HttpUrlPath(const vespalib::string& urlpath, const vespalib::string& serverSpec); + HttpUrlPath(vespalib::string path, + std::map<vespalib::string, vespalib::string> attributes, + vespalib::string serverSpec); + ~HttpUrlPath(); + + const vespalib::string& getPath() const { return _path; } + const std::map<vespalib::string, vespalib::string>& getAttributes() const + { return _attributes; } + + bool hasAttribute(const vespalib::string& id) const; + vespalib::string getAttribute(const vespalib::string& id, + const vespalib::string& defaultValue = "") const; + + const vespalib::string& getServerSpec() const { + return _serverSpec; + } + + template<typename T> + T get(const vespalib::string& id, const T& defaultValue = T()) const; + + void print(std::ostream& out, bool verbose, const std::string& indent) const override; +}; + +template<typename T> +T HttpUrlPath::get(const vespalib::string& id, const T& defaultValue) const +{ + std::map<vespalib::string, vespalib::string>::const_iterator it = _attributes.find(id); + if (it == _attributes.end()) return defaultValue; + T val; + std::istringstream ist(it->second); + ist >> val; + return val; +} + +} diff --git a/storage/src/vespa/storageframework/generic/status/statusreporter.cpp b/storage/src/vespa/storageframework/generic/status/statusreporter.cpp new file mode 100644 index 00000000000..7314dddde7f --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/statusreporter.cpp @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "statusreporter.h" + +namespace storage { +namespace framework { + +StatusReporter::StatusReporter(vespalib::stringref id, vespalib::stringref name) + : _id(id), + _name(name) +{ +} + +StatusReporter::~StatusReporter() +{ +} + +} // framework +} // storage diff --git a/storage/src/vespa/storageframework/generic/status/statusreporter.h b/storage/src/vespa/storageframework/generic/status/statusreporter.h new file mode 100644 index 00000000000..cc48bb841fd --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/statusreporter.h @@ -0,0 +1,61 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::StatusReporter + * \ingroup component + * + * \brief Interface to implement for status reporters. + * + * Components that wants to make status pages available can implement this + * interface in order to provide status information without depending on how + * this information is server. Status data is typically available through an + * HTTP server running in the process. + * + * Specializations of this interface exists for HTML and XML outputters. + */ +#pragma once + +#include <ostream> +#include <vespa/storageframework/generic/status/httpurlpath.h> +#include <vespa/vespalib/stllike/string.h> + +namespace storage::framework { + +struct StatusReporter +{ + StatusReporter(vespalib::stringref id, vespalib::stringref name); + virtual ~StatusReporter(); + + /** + * Get the identifier. The identifier is a string matching regex + * ^[A-Za-z0-9_]+$. It is used to identify the status page in contexts where + * special characters are not wanted, such as in an URL. + */ + const vespalib::string& getId() const { return _id; } + /** + * Get the descriptive name of the status reported. This string should be + * able to contain anything. + */ + const vespalib::string& getName() const { return _name; } + + virtual bool isValidStatusRequest() const { return true; } + + /** + * Called to get content type. + * An empty string indicates page not found. + */ + virtual vespalib::string getReportContentType(const HttpUrlPath&) const = 0; + + /** + * Called to get the actual content to return in the status request. + * @return False if no such page exist, in which case you should not have + * written to the output stream. + */ + virtual bool reportStatus(std::ostream&, const HttpUrlPath&) const = 0; + +private: + vespalib::string _id; + vespalib::string _name; + +}; + +} diff --git a/storage/src/vespa/storageframework/generic/status/statusreportermap.h b/storage/src/vespa/storageframework/generic/status/statusreportermap.h new file mode 100644 index 00000000000..8d72215df49 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/statusreportermap.h @@ -0,0 +1,24 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::StatusReporterMap + * \ingroup status + * + * \brief Interface to access the various status reporters + */ +#pragma once + +#include <vector> + +namespace storage::framework { + +struct StatusReporter; + +struct StatusReporterMap { + virtual ~StatusReporterMap() {} + + virtual const StatusReporter* getStatusReporter(vespalib::stringref id) = 0; + + virtual std::vector<const StatusReporter*> getStatusReporters() = 0; +}; + +} // storage::framework diff --git a/storage/src/vespa/storageframework/generic/status/xmlstatusreporter.cpp b/storage/src/vespa/storageframework/generic/status/xmlstatusreporter.cpp new file mode 100644 index 00000000000..258a3ea53de --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/xmlstatusreporter.cpp @@ -0,0 +1,61 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "xmlstatusreporter.h" +#include <cassert> + +namespace storage { +namespace framework { + +XmlStatusReporter::XmlStatusReporter(vespalib::stringref id, + vespalib::stringref name) + : StatusReporter(id, name) +{ +} + +XmlStatusReporter::~XmlStatusReporter() +{ +} + +void +XmlStatusReporter::initXmlReport(vespalib::XmlOutputStream& xos, + const HttpUrlPath&) const +{ + using namespace vespalib::xml; + xos << XmlTag("status") + << XmlAttribute("id", getId()) + << XmlAttribute("name", getName()); +} + +void +XmlStatusReporter::finalizeXmlReport(vespalib::XmlOutputStream& xos, + const HttpUrlPath&) const +{ + using namespace vespalib::xml; + xos << XmlEndTag(); + assert(xos.isFinalized()); +} + +vespalib::string +XmlStatusReporter::getReportContentType(const HttpUrlPath&) const +{ + return "application/xml"; +} + +bool +XmlStatusReporter::reportStatus(std::ostream& out, + const HttpUrlPath& path) const +{ + out << "<?xml version=\"1.0\"?>\n"; + vespalib::XmlOutputStream xos(out); + initXmlReport(xos, path); + vespalib::stringref failure = reportXmlStatus(xos, path); + if (!failure.empty()) { + using namespace vespalib::xml; + xos << XmlContent("Failed to report XML status: " + failure); + } + finalizeXmlReport(xos, path); + return failure.empty(); +} + +} // framework +} // storage diff --git a/storage/src/vespa/storageframework/generic/status/xmlstatusreporter.h b/storage/src/vespa/storageframework/generic/status/xmlstatusreporter.h new file mode 100644 index 00000000000..eff5a44148e --- /dev/null +++ b/storage/src/vespa/storageframework/generic/status/xmlstatusreporter.h @@ -0,0 +1,79 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::XmlStatusReporter + * \ingroup component + * + * \brief Specialization of StatusReporter for reporters of XML data. + * + * To make it easy to write legal XML and escape content that needs to be + * escaped, an XML writer is used to write the actual XML data. + * + * Note: If you want to write XML from a status reporter that can also write + * other types of content, best practise is to implement StatusReporter, and if + * serving XML in the reportStatus function, create a temporary + * XmlStatusReporter object, in order to reuse the report functions to init + * and finalize XML writing. + */ + +#pragma once + +#include "statusreporter.h" +#include <vespa/vespalib/util/xmlstream.h> + +namespace storage::framework { + +struct XmlStatusReporter : public StatusReporter { + XmlStatusReporter(vespalib::stringref id, vespalib::stringref name); + virtual ~XmlStatusReporter(); + + virtual void initXmlReport(vespalib::xml::XmlOutputStream&, + const HttpUrlPath&) const; + + /** + * @return Empty string if ok, otherwise indicate a failure condition. + */ + virtual vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, + const HttpUrlPath&) const = 0; + + virtual void finalizeXmlReport(vespalib::xml::XmlOutputStream&, + const HttpUrlPath&) const; + + // Implementation of status reporter interface + vespalib::string getReportContentType(const HttpUrlPath&) const override; + bool reportStatus(std::ostream&, const HttpUrlPath&) const override; +}; + +/** + * If you're only reporting XML in some cases, you can use this instance to + * wrap the actual XML parts, so you can reuse the code that outputs the XML. + * Just use output operator in this class to add the actual XML. + */ +class PartlyXmlStatusReporter : public XmlStatusReporter { + vespalib::XmlOutputStream _xos; + const HttpUrlPath& _path; + +public: + PartlyXmlStatusReporter(const StatusReporter& main, std::ostream& out, + const HttpUrlPath& path) + : XmlStatusReporter(main.getId(), main.getName()), + _xos(out), + _path(path) + { + initXmlReport(_xos, path); + } + + ~PartlyXmlStatusReporter() { + finalizeXmlReport(_xos, _path); + } + + vespalib::XmlOutputStream& getStream() { return _xos; } + vespalib::string reportXmlStatus(vespalib::xml::XmlOutputStream&, const HttpUrlPath&) const override { return ""; } + + template<typename T> + PartlyXmlStatusReporter& operator<<(const T& v) { + _xos << v; + return *this; + } +}; + +} diff --git a/storage/src/vespa/storageframework/generic/thread/.gitignore b/storage/src/vespa/storageframework/generic/thread/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/storage/src/vespa/storageframework/generic/thread/CMakeLists.txt b/storage/src/vespa/storageframework/generic/thread/CMakeLists.txt new file mode 100644 index 00000000000..a392725b771 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(storageframework_thread OBJECT + SOURCES + thread.cpp + threadpool.cpp + tickingthread.cpp + DEPENDS +) diff --git a/storage/src/vespa/storageframework/generic/thread/runnable.h b/storage/src/vespa/storageframework/generic/thread/runnable.h new file mode 100644 index 00000000000..b1fd2eae237 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/runnable.h @@ -0,0 +1,63 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Runnable + * \ingroup thread + * + * \brief Minimal API for something that can be run. + * + * Minimum API to implement to be able to be run by a thread. + */ +#pragma once + +#include <vespa/storageframework/generic/clock/time.h> + +namespace storage::framework { + +/** + * A cycle type can be given when registering ticks. This is useful for + * monitoring, to see the difference between cycles that is just waiting and + * cycles that are processing. If this information is known, the monitoring + * tools can see that the longest process cycle have been 5 ms, even though + * the thread is waiting for 1000 ms when it is idle. + */ +enum CycleType { UNKNOWN_CYCLE, WAIT_CYCLE, PROCESS_CYCLE }; +const char* getCycleTypeName(CycleType); + +struct ThreadHandle { + virtual ~ThreadHandle() {} + + /** Check whether thread have been interrupted or not. */ + virtual bool interrupted() const = 0; + + /** + * Register a tick. Useful such that a deadlock detector can detect that + * threads are actually doing something. If cycle types are specified, + * deadlock detector can specifically know what thread has been doing and + * used appropriate max limit. On unknown cycles, less information is + * available, and deadlock detector will use sum of wait and process time. + * + * The cycle type specified is for the cycle that just passed. + * + * @param currentTime Callers can set current time such that backend does + * not need to calculate clock. (Too avoid additional + * clock fetches if client already knows current time) + */ + virtual void registerTick(CycleType = UNKNOWN_CYCLE, + vespalib::steady_time = vespalib::steady_time()) = 0; + + virtual vespalib::duration getWaitTime() const = 0; + + /** + * The number of ticks done before wait is called when no more work is + * reported. + */ + virtual int getTicksBeforeWait() const = 0; +}; + +struct Runnable { + virtual ~Runnable() {} + + virtual void run(ThreadHandle&) = 0; +}; + +} // storage::framework diff --git a/storage/src/vespa/storageframework/generic/thread/taskthread.h b/storage/src/vespa/storageframework/generic/thread/taskthread.h new file mode 100644 index 00000000000..49f61fa0f13 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/taskthread.h @@ -0,0 +1,71 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Implementation of ticking threads for performing prioritized tasks. + * Implements critical section and a prioritized queue for communication + * outside of thread. + * + * Note that doNonCriticalTick is not implemented to call a processTask() + * function, as applications might want to do something even if there is no + * task, prioritize something above a task at some time, or process multiple + * tasks in one tick (to reduce locking overhead).. Thus we expect most apps to + * want to implement doNonCriticalTick() anyhow, so rather we just make + * available functions for peeking and extracting tasks. + */ + +#pragma once + +#include <queue> +#include <vespa/storageframework/generic/thread/tickingthread.h> + +namespace storage { +namespace framework { + +template <typename Task> +class TaskThread : public TickingThread { + ThreadLock& _lock; + std::vector<Task> _enqueued; + std::priority_queue<Task> _tasks; + +public: + TaskThread(ThreadLock& lock); + + void addTask(const Task& t); + ThreadWaitInfo doCriticalTick(ThreadIndex) override; + + bool empty() const { return _tasks.empty(); } + const Task& peek() const { return _tasks.top(); } + void pop() { _tasks.pop(); } + +private: + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) override = 0; +}; + +template <typename Task> +TaskThread<Task>::TaskThread(ThreadLock& lock) + : _lock(lock) +{ +} + +template <typename Task> +void +TaskThread<Task>::addTask(const Task& t) +{ + TickingLockGuard lock(_lock.freezeCriticalTicks()); + _enqueued.push_back(t); + lock.broadcast(); +} + +template <typename Task> +ThreadWaitInfo +TaskThread<Task>::doCriticalTick(ThreadIndex) { + std::vector<Task> enqueued; + enqueued.swap(_enqueued); + for (size_t i=0, n=enqueued.size(); i<n; ++i) { + _tasks.push(enqueued[i]); + } + return ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; +} + + +} // framework +} // storage diff --git a/storage/src/vespa/storageframework/generic/thread/thread.cpp b/storage/src/vespa/storageframework/generic/thread/thread.cpp new file mode 100644 index 00000000000..388ac93a9b5 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/thread.cpp @@ -0,0 +1,22 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "thread.h" + +namespace storage::framework { + +void +Thread::interruptAndJoin() +{ + interrupt(); + join(); +} + +void +Thread::interruptAndJoin(std::condition_variable &cv) +{ + interrupt(); + cv.notify_all(); + join(); +} + +} diff --git a/storage/src/vespa/storageframework/generic/thread/thread.h b/storage/src/vespa/storageframework/generic/thread/thread.h new file mode 100644 index 00000000000..c17638a0d42 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/thread.h @@ -0,0 +1,57 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::Thread + * \ingroup thread + * + * \brief A wrapper for a thread class. + * + * This thread class exist to hide the actual implementation of threads used, + * and to give some extra information about the threads. This is in turned used + * by monitoring, to be able to see data about the threads running. One such + * monitoring tool is the deadlock detector. + */ +#pragma once + +#include "runnable.h" +#include <condition_variable> + +namespace storage::framework { + +class Thread : public ThreadHandle { + vespalib::string _id; + +public: + typedef std::unique_ptr<Thread> UP; + + Thread(vespalib::stringref id) : _id(id) {} + virtual ~Thread() = default; + + virtual const vespalib::string& getId() const { return _id; } + + /** Check whether thread have been interrupted or not. */ + virtual bool interrupted() const override = 0; + /** Check whether thread have been joined or not. */ + virtual bool joined() const = 0; + + /** + * Call this function to set interrupt flag, such that later calls to + * interrupt returns true. If called on already interrupted thread it is a + * noop. + */ + virtual void interrupt() = 0; + /** + * Call this function to wait until thread has finished processing. If + * called after thread has already finished, it is a noop. + */ + virtual void join() = 0; + + /** + * Utility function to interrupt and join a thread, possibly broadcasting + * through a monitor after the signalling face. + */ + void interruptAndJoin(); + + void interruptAndJoin(std::condition_variable &cv); +}; + +} diff --git a/storage/src/vespa/storageframework/generic/thread/threadpool.cpp b/storage/src/vespa/storageframework/generic/thread/threadpool.cpp new file mode 100644 index 00000000000..480e42c91ef --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/threadpool.cpp @@ -0,0 +1,16 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "threadpool.h" + +namespace storage::framework { + +ThreadProperties::ThreadProperties(vespalib::duration waitTime, + vespalib::duration maxProcessTime, + int ticksBeforeWait) + : _maxProcessTime(maxProcessTime), + _waitTime(waitTime), + _ticksBeforeWait(ticksBeforeWait) +{ +} + +} diff --git a/storage/src/vespa/storageframework/generic/thread/threadpool.h b/storage/src/vespa/storageframework/generic/thread/threadpool.h new file mode 100644 index 00000000000..7607932e079 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/threadpool.h @@ -0,0 +1,94 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * \class storage::framework::ThreadPool + * \ingroup thread + * + * \brief A threadpool implementation usable by storage components. + * + * Using this threadpool interface, we can use a threadpool without depending + * on the actual implementation. Also, as information is provided of the + * threads, monitoring tools, like the deadlock detector can extract information + * about the threads. + */ +#pragma once + +#include <atomic> +#include <vespa/storageframework/generic/thread/runnable.h> +#include <vespa/storageframework/generic/thread/thread.h> +#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/vespalib/util/cpu_usage.h> +#include <optional> +#include <vector> + +namespace storage::framework { + +/** + * Each thread may have different properties, as to how long they wait between + * ticks and how long they're supposed to use processing between ticks. To be + * able to specify this per thread, a set of properties can be set by each + * thread. + */ +class ThreadProperties { +private: + /** + * Time this thread should maximum use to process before a tick is + * registered. (Including wait time if wait time is not set) + */ + vespalib::duration _maxProcessTime; + /** + * Time this thread will wait in a non-interrupted wait cycle. + * Used in cases where a wait cycle is registered. As long as no other + * time consuming stuff is done in a wait cycle, you can just use the + * wait time here. The deadlock detector should add a configurable + * global time period before flagging deadlock anyways. + */ + vespalib::duration _waitTime; + /** + * Number of ticks to be done before a wait. + */ + uint32_t _ticksBeforeWait; + + public: + ThreadProperties(vespalib::duration waitTime, + vespalib::duration maxProcessTime, + int ticksBeforeWait); + + vespalib::duration getMaxProcessTime() const { return _maxProcessTime; } + vespalib::duration getWaitTime() const { return _waitTime; } + int getTicksBeforeWait() const { return _ticksBeforeWait; } + + vespalib::duration getMaxCycleTime() const { + return std::max(_maxProcessTime, _waitTime); + } +}; + +/** Data kept on each thread due to the registerTick functinality. */ +struct ThreadTickData { + CycleType _lastTickType; + vespalib::steady_time _lastTick; + vespalib::duration _maxProcessingTimeSeen; + vespalib::duration _maxWaitTimeSeen; +}; + +/** Interface used to access data for the existing threads. */ +struct ThreadVisitor { + virtual ~ThreadVisitor() = default; + virtual void visitThread(const vespalib::string& id, + const ThreadProperties&, + const ThreadTickData&) = 0; +}; + +struct ThreadPool { + virtual ~ThreadPool() = default; + + virtual Thread::UP startThread(Runnable&, + vespalib::stringref id, + vespalib::duration waitTime, + vespalib::duration maxProcessTime, + int ticksBeforeWait, + std::optional<vespalib::CpuUsage::Category> cpu_category) = 0; + + virtual void visitThreads(ThreadVisitor&) const = 0; +}; + +} diff --git a/storage/src/vespa/storageframework/generic/thread/tickingthread.cpp b/storage/src/vespa/storageframework/generic/thread/tickingthread.cpp new file mode 100644 index 00000000000..1a9cb459f28 --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/tickingthread.cpp @@ -0,0 +1,226 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "tickingthread.h" +#include "threadpool.h" +#include <vespa/vespalib/stllike/asciistream.h> +#include <cassert> + +namespace storage::framework { + +ThreadWaitInfo ThreadWaitInfo::MORE_WORK_ENQUEUED(false); +ThreadWaitInfo ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN(true); + +void +ThreadWaitInfo::merge(const ThreadWaitInfo& other) { + if (!other._waitWanted) { + _waitWanted = false; + } +} + +/** + * \brief Implementation actually doing lock handling, waiting, and allowing a + * global synchronization point where no thread is currently running. + */ +class TickingThreadRunner final : public Runnable { + std::mutex & _monitor; + std::condition_variable & _cond; + TickingThread & _tickingThread; + uint32_t _threadIndex; + bool _wantToFreeze; + bool _frozen; + char _state; + +public: + typedef std::shared_ptr<TickingThreadRunner> SP; + + TickingThreadRunner(std::mutex& m, + std::condition_variable & cond, + TickingThread& ticker, + uint32_t threadIndex) noexcept + : _monitor(m), _cond(cond), _tickingThread(ticker), + _threadIndex(threadIndex), _wantToFreeze(false), _frozen(false) {} + + /** + * Call to freeze this thread. Returns then the thread has done executing + * tick and has frozen. + */ + void freeze() { + std::unique_lock guard(_monitor); + _wantToFreeze = true; + while (!_frozen) { + _cond.wait(guard); + } + } + + /** + * Call to thaw up a frozen thread so it can continue. + */ + void thaw() { + { + std::lock_guard guard(_monitor); + _wantToFreeze = false; + } + _cond.notify_all(); + } + + char getState() const { return _state; } + +private: + void run(ThreadHandle& handle) override { + ThreadWaitInfo info = ThreadWaitInfo::MORE_WORK_ENQUEUED; + CycleType cycle = PROCESS_CYCLE; + int ticksExecutedAfterWait = 0; + while (!handle.interrupted()) { + { + std::unique_lock guard(_monitor); + if (info.waitWanted()) { + _state = 'w'; + cycle = WAIT_CYCLE; + if (ticksExecutedAfterWait >= handle.getTicksBeforeWait()) { + _cond.wait_for(guard, handle.getWaitTime()); + ticksExecutedAfterWait = 0; + } + } + if (_wantToFreeze) { + _state = 'f'; + doFreeze(guard, _cond); + ticksExecutedAfterWait = 0; + } + _state = 'c'; + info.merge(_tickingThread.doCriticalTick(_threadIndex)); + _state = 'n'; + } + handle.registerTick(cycle); + ticksExecutedAfterWait++; + cycle = PROCESS_CYCLE; + info = _tickingThread.doNonCriticalTick(_threadIndex); + } + _state = 's'; + } + void doFreeze(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) { + _frozen = true; + cond.notify_all(); + while (_wantToFreeze) { + _cond.wait(guard); + } + _frozen = false; + } +}; + +class TickingThreadPoolImpl final : public TickingThreadPool { + const vespalib::string _name; + const vespalib::duration _waitTime; + const vespalib::duration _maxProcessTime; + const uint32_t _ticksBeforeWait; + std::mutex _lock; + std::condition_variable _cond; + std::vector<TickingThreadRunner::SP> _tickers; + std::vector<std::shared_ptr<Thread>> _threads; + + struct FreezeGuard final : public TickingLockGuard::Impl { + TickingThreadPoolImpl& _pool; + + explicit FreezeGuard(TickingThreadPoolImpl& pool) : _pool(pool) { _pool.freeze(); } + ~FreezeGuard() override { _pool.thaw(); } + void broadcast() override {} + }; + struct CriticalGuard final : public TickingLockGuard::Impl { + std::unique_lock<std::mutex> _guard; + std::condition_variable &_cond; + + explicit CriticalGuard(std::mutex & lock, std::condition_variable & cond) : _guard(lock), _cond(cond) {} + + void broadcast() override { _cond.notify_all(); } + }; + +public: + TickingThreadPoolImpl(vespalib::stringref name, vespalib::duration waitTime, + int ticksBeforeWait, vespalib::duration maxProcessTime) + : _name(name), + _waitTime(waitTime), + _maxProcessTime(maxProcessTime), + _ticksBeforeWait(ticksBeforeWait) + { } + + ~TickingThreadPoolImpl() override { + stop(); + } + + void addThread(TickingThread& ticker) override { + ThreadIndex index = _tickers.size(); + ticker.newThreadCreated(index); + _tickers.emplace_back(std::make_shared<TickingThreadRunner>(_lock, _cond, ticker, index)); + } + + void start(ThreadPool& pool) override { + assert(!_tickers.empty()); + for (uint32_t i=0; i<_tickers.size(); ++i) { + vespalib::asciistream ost; + ost << _name.c_str() << " thread " << i; + _threads.push_back(std::shared_ptr<Thread>(pool.startThread( + *_tickers[i], + ost.str(), + _waitTime, + _maxProcessTime, + _ticksBeforeWait, std::nullopt))); + } + } + + TickingLockGuard freezeAllTicks() override { + return TickingLockGuard(std::make_unique<FreezeGuard>(*this)); + } + + TickingLockGuard freezeCriticalTicks() override { + return TickingLockGuard(std::make_unique<CriticalGuard>(_lock, _cond)); + } + + void stop() override { + for (auto& t : _threads) { + t->interrupt(); + } + { + _cond.notify_all(); + } + for (auto& t : _threads) { + t->join(); + } + } + + vespalib::string getStatus() override { + vespalib::string result(_tickers.size(), ' '); + for (uint32_t i=0, n=_tickers.size(); i<n; ++i) { + result[i] = _tickers[i]->getState(); + } + return result; + } + +private: + void freeze() { + for (auto& t : _tickers) { + t->freeze(); + } + } + + void thaw() { + for (auto& t : _tickers) { + t->thaw(); + } + } +}; + +TickingThreadPool::UP +TickingThreadPool::createDefault( + vespalib::stringref name, + vespalib::duration waitTime, + int ticksBeforeWait, + vespalib::duration maxProcessTime) +{ + return std::make_unique<TickingThreadPoolImpl>(name, waitTime, ticksBeforeWait, maxProcessTime); +} + +TickingThreadPool::UP +TickingThreadPool::createDefault(vespalib::stringref name, vespalib::duration waitTime) +{ + return createDefault(name, waitTime, 1, 5s); +} + +} // storage::framework diff --git a/storage/src/vespa/storageframework/generic/thread/tickingthread.h b/storage/src/vespa/storageframework/generic/thread/tickingthread.h new file mode 100644 index 00000000000..9ddc47c8f3a --- /dev/null +++ b/storage/src/vespa/storageframework/generic/thread/tickingthread.h @@ -0,0 +1,99 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * This file contains a utility function to handle threads doing a lot of + * single ticks. It brings the following functionality: + * + * - Give application setting up the threads a way to synchronize all the + * threads so it can perform some operation while no thread is ticking. + * - Give multiple threads a way to use common lock for critical region, such + * that you can divide responsible between multiple threads, and still have + * a way to notify and wait for all. + * - Automatically implement registration in deadlock handler, and updating + * tick times there. + * - Give a thread specific context to tick functions, such that one class + * instance can be used for all threads. + * - Hide thread functionality for starting, stopping and running. + * - Minimizes locking by using a single lock that is taken only once per + * tick loop. + */ +#pragma once + +#include <memory> +#include <vespa/storageframework/generic/clock/time.h> +#include <vespa/vespalib/stllike/string.h> + +namespace storage::framework { + +struct ThreadPool; +using ThreadIndex = uint32_t; + +/** + * \brief Information returned from tick functions to indicate whether thread + * should throttle a bit or not. + */ +class ThreadWaitInfo { + bool _waitWanted; + explicit ThreadWaitInfo(bool waitBeforeNextTick) : _waitWanted(waitBeforeNextTick) {} + +public: + static ThreadWaitInfo MORE_WORK_ENQUEUED; + static ThreadWaitInfo NO_MORE_CRITICAL_WORK_KNOWN; + + void merge(const ThreadWaitInfo& other); + bool waitWanted() const noexcept { return _waitWanted; } +}; + +/** + * \brief Simple superclass to implement for ticking threads. + */ +struct TickingThread { + virtual ~TickingThread() = default; + + virtual ThreadWaitInfo doCriticalTick(ThreadIndex) = 0; + virtual ThreadWaitInfo doNonCriticalTick(ThreadIndex) = 0; + virtual void newThreadCreated(ThreadIndex) {} +}; + +/** \brief Delete to allow threads to tick again. */ +struct TickingLockGuard { + struct Impl { + virtual ~Impl() = default; + virtual void broadcast() = 0; + }; + explicit TickingLockGuard(std::unique_ptr<Impl> impl) : _impl(std::move(impl)) {} + void broadcast() { _impl->broadcast(); } +private: + std::unique_ptr<Impl> _impl; +}; + +struct ThreadLock { + virtual ~ThreadLock() = default; + virtual TickingLockGuard freezeAllTicks() = 0; + virtual TickingLockGuard freezeCriticalTicks() = 0; +}; + +/** + * \brief Thread pool set up by the application to control the threads. + */ +struct TickingThreadPool : public ThreadLock { + using UP = std::unique_ptr<TickingThreadPool>; + + // TODO STRIPE: Change waitTime default to 100ms when legacy mode is removed. + static TickingThreadPool::UP createDefault( + vespalib::stringref name, + vespalib::duration waitTime, + int ticksBeforeWait, + vespalib::duration maxProcessTime); + static TickingThreadPool::UP createDefault(vespalib::stringref name, vespalib::duration waitTime); + + ~TickingThreadPool() override = default; + + /** All threads must be added before starting the threads. */ + virtual void addThread(TickingThread& ticker) = 0; + /** Start all the threads added. */ + virtual void start(ThreadPool& pool) = 0; + virtual void stop() = 0; + virtual vespalib::string getStatus() = 0; +}; + +} diff --git a/storage/src/vespa/storageframework/storageframework.h b/storage/src/vespa/storageframework/storageframework.h new file mode 100644 index 00000000000..12d4f504d30 --- /dev/null +++ b/storage/src/vespa/storageframework/storageframework.h @@ -0,0 +1,15 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * + * This file includes the most common parts used by the framework. + */ + +#include <vespa/storageframework/generic/clock/clock.h> +#include <vespa/storageframework/generic/clock/timer.h> +#include <vespa/storageframework/generic/component/component.h> +#include <vespa/storageframework/generic/metric/metricupdatehook.h> +#include <vespa/storageframework/generic/status/htmlstatusreporter.h> +#include <vespa/storageframework/generic/status/statusreportermap.h> +#include <vespa/storageframework/generic/status/xmlstatusreporter.h> +#include <vespa/storageframework/generic/thread/threadpool.h> + |