aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-11 11:39:30 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-11-13 16:59:07 +0000
commitd6c99a96fc8d3d2ced331998a81a26beb6acc825 (patch)
tree3424c0d564a1703710b3b506fdcf107eca079080 /staging_vespalib
parent5cb9b85ffb3ec2a77caaa4ee367ad6435bbf762a (diff)
Add a fixed size table of 8 * num_exutors with 16 bit entries. Use this for mapping the first components exact.
With more components than 8x we fall abck to to using shrunk id of 8 bits as before. This enables perfect distribution for the first 8x and then 'good enough' for the rest. The more there are the less impact of imperfect distribution will be.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp69
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h20
3 files changed, 68 insertions, 32 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 647bcf9bcee..4dded5d90ce 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -242,11 +242,16 @@ TEST("require that you get correct number of executors") {
TEST("require that you distribute well") {
auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven);
+ const uint32_t NUM_EXACT = 8 * seven->getNumExecutors();
EXPECT_EQUAL(7u, seven->getNumExecutors());
EXPECT_EQUAL(97u, seq.getComponentHashSize());
EXPECT_EQUAL(0u, seq.getComponentEffectiveHashSize());
for (uint32_t id=0; id < 1000; id++) {
- EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId());
+ if (id < NUM_EXACT) {
+ EXPECT_EQUAL(id % seven->getNumExecutors(), seven->getExecutorId(id).getId());
+ } else {
+ EXPECT_EQUAL(((id - NUM_EXACT) % 97) % seven->getNumExecutors(), seven->getExecutorId(id).getId());
+ }
}
EXPECT_EQUAL(97u, seq.getComponentHashSize());
EXPECT_EQUAL(97u, seq.getComponentEffectiveHashSize());
@@ -272,8 +277,8 @@ TEST("require that similar names gets 7/8 unique ids with 8 executors") {
EXPECT_EQUAL(3u, four->getExecutorIdFromName("f4").getId());
EXPECT_EQUAL(4u, four->getExecutorIdFromName("f5").getId());
EXPECT_EQUAL(5u, four->getExecutorIdFromName("f6").getId());
- EXPECT_EQUAL(2u, four->getExecutorIdFromName("f7").getId());
- EXPECT_EQUAL(6u, four->getExecutorIdFromName("f8").getId());
+ EXPECT_EQUAL(6u, four->getExecutorIdFromName("f7").getId());
+ EXPECT_EQUAL(7u, four->getExecutorIdFromName("f8").getId());
}
TEST("Test creation of different types") {
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index f92e1655e7d..685f670b164 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -35,14 +35,14 @@ SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t thre
size_t num_strands = std::min(taskLimit, threads*32);
return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit);
} else {
- auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>();
- executors->reserve(threads);
+ auto executors = std::vector<std::unique_ptr<SyncableThreadExecutor>>();
+ executors.reserve(threads);
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 2 : kindOfWatermark;
- executors->push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
} else {
- executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
+ executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
}
return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
@@ -54,21 +54,23 @@ SequencedTaskExecutor::~SequencedTaskExecutor()
sync_all();
}
-SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
- : ISequencedTaskExecutor(executors->size()),
+SequencedTaskExecutor::SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executors)
+ : ISequencedTaskExecutor(executors.size()),
_executors(std::move(executors)),
- _lazyExecutors(isLazy(*_executors)),
- _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC),
+ _lazyExecutors(isLazy(_executors)),
+ _component2IdPerfect(),
+ _component2IdImperfect(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC),
_mutex(),
_nextId(0)
{
assert(getNumExecutors() < 256);
+ _component2IdPerfect.reserve(getNumExecutors()*8);
}
void
SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
{
- for (const auto &executor : *_executors) {
+ for (const auto &executor : _executors) {
executor->setTaskLimit(taskLimit);
}
}
@@ -76,15 +78,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
void
SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
{
- assert(id.getId() < _executors->size());
- auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task));
+ assert(id.getId() < _executors.size());
+ auto rejectedTask = _executors[id.getId()]->execute(std::move(task));
assert(!rejectedTask);
}
void
SequencedTaskExecutor::sync_all() {
wakeup();
- for (auto &executor : *_executors) {
+ for (auto &executor : _executors) {
executor->sync();
}
}
@@ -92,7 +94,7 @@ SequencedTaskExecutor::sync_all() {
void
SequencedTaskExecutor::wakeup() {
if (_lazyExecutors) {
- for (auto &executor : *_executors) {
+ for (auto &executor : _executors) {
//Enforce parallel wakeup of napping executors.
executor->wakeup();
}
@@ -103,7 +105,7 @@ ExecutorStats
SequencedTaskExecutor::getStats()
{
ExecutorStats accumulatedStats;
- for (auto &executor :* _executors) {
+ for (auto &executor : _executors) {
accumulatedStats.aggregate(executor->getStats());
}
return accumulatedStats;
@@ -111,15 +113,40 @@ SequencedTaskExecutor::getStats()
ISequencedTaskExecutor::ExecutorId
SequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
- uint32_t shrunkId = componentId % _component2Id.size();
- uint8_t executorId = _component2Id[shrunkId];
+ PerfectKeyT key = componentId;
+ auto found = std::find(_component2IdPerfect.begin(), _component2IdPerfect.end(), key);
+ if (found != _component2IdPerfect.end()) {
+ return ExecutorId((found - _component2IdPerfect.begin()) % getNumExecutors());
+ } else if (_component2IdPerfect.size() < _component2IdPerfect.capacity()) {
+ return getExecutorIdPerfect(componentId);
+ } else {
+ return getExecutorIdImPerfect(componentId);
+ }
+}
+
+ISequencedTaskExecutor::ExecutorId
+SequencedTaskExecutor::getExecutorIdPerfect(uint64_t componentId) const {
+ PerfectKeyT key = componentId;
+ std::lock_guard guard(_mutex);
+ auto found = std::find(_component2IdPerfect.begin(), _component2IdPerfect.end(), key);
+ if (found == _component2IdPerfect.end()) {
+ _component2IdPerfect.push_back(key);
+ found = _component2IdPerfect.end() - 1;
+ }
+ return ExecutorId((found - _component2IdPerfect.begin()) % getNumExecutors());
+}
+
+ISequencedTaskExecutor::ExecutorId
+SequencedTaskExecutor::getExecutorIdImPerfect(uint64_t componentId) const {
+ uint32_t shrunkId = componentId % _component2IdImperfect.size();
+ uint8_t executorId = _component2IdImperfect[shrunkId];
if (executorId == MAGIC) {
std::lock_guard guard(_mutex);
- if (_component2Id[shrunkId] == MAGIC) {
- _component2Id[shrunkId] = _nextId % getNumExecutors();
+ if (_component2IdImperfect[shrunkId] == MAGIC) {
+ _component2IdImperfect[shrunkId] = _nextId % getNumExecutors();
_nextId++;
}
- executorId = _component2Id[shrunkId];
+ executorId = _component2IdImperfect[shrunkId];
}
return ExecutorId(executorId);
}
@@ -127,10 +154,10 @@ SequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
const vespalib::SyncableThreadExecutor*
SequencedTaskExecutor::first_executor() const
{
- if (_executors->empty()) {
+ if (_executors.empty()) {
return nullptr;
}
- return _executors->front().get();
+ return _executors.front().get();
}
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 245d6d29780..660bc74472b 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -38,18 +38,22 @@ public:
/**
* For testing only
*/
- uint32_t getComponentHashSize() const { return _component2Id.size(); }
+ uint32_t getComponentHashSize() const { return _component2IdImperfect.size(); }
uint32_t getComponentEffectiveHashSize() const { return _nextId; }
const vespalib::SyncableThreadExecutor* first_executor() const;
private:
- explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
-
- std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
- const bool _lazyExecutors;
- mutable std::vector<uint8_t> _component2Id;
- mutable std::mutex _mutex;
- mutable uint32_t _nextId;
+ explicit SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executor);
+ ExecutorId getExecutorIdPerfect(uint64_t componentId) const;
+ ExecutorId getExecutorIdImPerfect(uint64_t componentId) const;
+
+ std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> _executors;
+ using PerfectKeyT = uint16_t;
+ const bool _lazyExecutors;
+ mutable std::vector<PerfectKeyT> _component2IdPerfect;
+ mutable std::vector<uint8_t> _component2IdImperfect;
+ mutable std::mutex _mutex;
+ mutable uint32_t _nextId;
};