diff options
Diffstat (limited to 'staging_vespalib')
8 files changed, 60 insertions, 22 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp index 10f3f6089e3..2ca49105610 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp @@ -63,6 +63,8 @@ public: } }; +vespalib::stringref ZERO("0"); + TEST_F("testExecute", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); @@ -97,7 +99,7 @@ TEST_F("require that task with different component ids are not serialized", Fixt std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(2, [&]() { tv->modify(14, 42); }); + f._threads.execute(1, [&]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; @@ -118,8 +120,8 @@ TEST_F("require that task with same string component id are serialized", Fixture std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); auto test2 = [&]() { tv->modify(14, 42); }; - f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId("0"), test2); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), test2); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); @@ -136,8 +138,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); }); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorIdFromName(altComponentId), [&]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; @@ -156,10 +158,10 @@ vespalib::string makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0"); + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorIdFromName(ZERO); for (tryCnt = 1; tryCnt < 100; ++tryCnt) { sprintf(altComponentId, "%d", tryCnt); - if (f._threads.getExecutorId(altComponentId) == executorId0) { + if (f._threads.getExecutorIdFromName(altComponentId) == executorId0) { break; } } @@ -236,13 +238,9 @@ TEST("require that you get correct number of executors") { TEST("require that you distribute well") { AdaptiveSequencedExecutor seven(7, 1, 0, 10); EXPECT_EQUAL(7u, seven.getNumExecutors()); - EXPECT_EQUAL(97u, seven.getComponentHashSize()); - EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize()); for (uint32_t id=0; id < 1000; id++) { - EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId()); + EXPECT_EQUAL(id%7, seven.getExecutorId(id).getId()); } - EXPECT_EQUAL(97u, seven.getComponentHashSize()); - EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize()); } } diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 70d0f1c743d..6128386837d 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -65,6 +65,8 @@ public: } }; +vespalib::stringref ZERO("0"); + TEST_F("testExecute", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); @@ -120,8 +122,8 @@ TEST_F("require that task with same string component id are serialized", Fixture std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); auto test2 = [=]() { tv->modify(14, 42); }; - f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(f._threads->getExecutorId("0"), test2); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), test2); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); @@ -138,8 +140,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorIdFromName(altComponentId), [=]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; @@ -158,10 +160,10 @@ vespalib::string makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0"); + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorIdFromName(ZERO); for (tryCnt = 1; tryCnt < 100; ++tryCnt) { sprintf(altComponentId, "%d", tryCnt); - if (f._threads->getExecutorId(altComponentId) == executorId0) { + if (f._threads->getExecutorIdFromName(altComponentId) == executorId0) { break; } } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 50bc3b020a8..3e87749c794 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -256,6 +256,11 @@ AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor() assert(_worker_stack.empty()); } +ISequencedTaskExecutor::ExecutorId +AdaptiveSequencedExecutor::getExecutorId(uint64_t component) const { + return ExecutorId(component % _strands.size()); +} + void AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) { diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index bc3457a72ef..a4d3ac97758 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -117,6 +117,7 @@ public: AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending); ~AdaptiveSequencedExecutor() override; + ExecutorId getExecutorId(uint64_t component) const override; void executeTask(ExecutorId id, Task::UP task) override; void sync() override; void setTaskLimit(uint32_t task_limit) override; diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h new file mode 100644 index 00000000000..575552971fa --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -0,0 +1,31 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/threadexecutor.h> +#include <atomic> + +namespace vespalib { + +/** + * Implementation of the ThreadExecutor interface that runs all tasks in the foreground by the calling thread. + */ +class ForegroundThreadExecutor : public vespalib::ThreadExecutor { +private: + std::atomic<size_t> _accepted; + +public: + ForegroundThreadExecutor() : _accepted(0) { } + Task::UP execute(Task::UP task) override { + task->run(); + ++_accepted; + return Task::UP(); + } + size_t getNumThreads() const override { return 0; } + Stats getStats() override { + return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); + } + virtual void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } +}; + +} diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp index d05702cc85b..f8f1f64fac5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -23,7 +23,7 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const { +ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const { vespalib::hash<vespalib::stringref> hashfun; return getExecutorId(hashfun(componentId)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index cd2a6c6f0d8..034e1520b8d 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -37,10 +37,10 @@ public: * @param componentId component id * @return executor id */ - ExecutorId getExecutorId(uint64_t componentId) const; + virtual ExecutorId getExecutorId(uint64_t componentId) const; uint32_t getNumExecutors() const { return _numExecutors; } - ExecutorId getExecutorId(vespalib::stringref componentId) const; + ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const; /** * Schedule a task to run after all previously scheduled tasks with diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index a0c2f0ac237..0fd78d8dcf6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -18,7 +18,8 @@ std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) { if (optimize == OptimizeFor::ADAPTIVE) { - return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, kindOfWatermark, taskLimit); + size_t num_strands = std::min(taskLimit, threads*32); + return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit); } else { auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>(); executors->reserve(threads); |