summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-11 11:39:30 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-11-13 16:59:07 +0000
commitd6c99a96fc8d3d2ced331998a81a26beb6acc825 (patch)
tree3424c0d564a1703710b3b506fdcf107eca079080
parent5cb9b85ffb3ec2a77caaa4ee367ad6435bbf762a (diff)
Add a fixed size table of 8 * num_exutors with 16 bit entries. Use this for mapping the first components exact.
With more components than 8x we fall abck to to using shrunk id of 8 bits as before. This enables perfect distribution for the first 8x and then 'good enough' for the rest. The more there are the less impact of imperfect distribution will be.
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp69
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h20
3 files changed, 68 insertions, 32 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 647bcf9bcee..4dded5d90ce 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -242,11 +242,16 @@ TEST("require that you get correct number of executors") {
TEST("require that you distribute well") {
auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven);
+ const uint32_t NUM_EXACT = 8 * seven->getNumExecutors();
EXPECT_EQUAL(7u, seven->getNumExecutors());
EXPECT_EQUAL(97u, seq.getComponentHashSize());
EXPECT_EQUAL(0u, seq.getComponentEffectiveHashSize());
for (uint32_t id=0; id < 1000; id++) {
- EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId());
+ if (id < NUM_EXACT) {
+ EXPECT_EQUAL(id % seven->getNumExecutors(), seven->getExecutorId(id).getId());
+ } else {
+ EXPECT_EQUAL(((id - NUM_EXACT) % 97) % seven->getNumExecutors(), seven->getExecutorId(id).getId());
+ }
}
EXPECT_EQUAL(97u, seq.getComponentHashSize());
EXPECT_EQUAL(97u, seq.getComponentEffectiveHashSize());
@@ -272,8 +277,8 @@ TEST("require that similar names gets 7/8 unique ids with 8 executors") {
EXPECT_EQUAL(3u, four->getExecutorIdFromName("f4").getId());
EXPECT_EQUAL(4u, four->getExecutorIdFromName("f5").getId());
EXPECT_EQUAL(5u, four->getExecutorIdFromName("f6").getId());
- EXPECT_EQUAL(2u, four->getExecutorIdFromName("f7").getId());
- EXPECT_EQUAL(6u, four->getExecutorIdFromName("f8").getId());
+ EXPECT_EQUAL(6u, four->getExecutorIdFromName("f7").getId());
+ EXPECT_EQUAL(7u, four->getExecutorIdFromName("f8").getId());
}
TEST("Test creation of different types") {
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index f92e1655e7d..685f670b164 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -35,14 +35,14 @@ SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t thre
size_t num_strands = std::min(taskLimit, threads*32);
return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit);
} else {
- auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>();
- executors->reserve(threads);
+ auto executors = std::vector<std::unique_ptr<SyncableThreadExecutor>>();
+ executors.reserve(threads);
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 2 : kindOfWatermark;
- executors->push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
} else {
- executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
+ executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
}
return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
@@ -54,21 +54,23 @@ SequencedTaskExecutor::~SequencedTaskExecutor()
sync_all();
}
-SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
- : ISequencedTaskExecutor(executors->size()),
+SequencedTaskExecutor::SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executors)
+ : ISequencedTaskExecutor(executors.size()),
_executors(std::move(executors)),
- _lazyExecutors(isLazy(*_executors)),
- _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC),
+ _lazyExecutors(isLazy(_executors)),
+ _component2IdPerfect(),
+ _component2IdImperfect(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC),
_mutex(),
_nextId(0)
{
assert(getNumExecutors() < 256);
+ _component2IdPerfect.reserve(getNumExecutors()*8);
}
void
SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
{
- for (const auto &executor : *_executors) {
+ for (const auto &executor : _executors) {
executor->setTaskLimit(taskLimit);
}
}
@@ -76,15 +78,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
void
SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
{
- assert(id.getId() < _executors->size());
- auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task));
+ assert(id.getId() < _executors.size());
+ auto rejectedTask = _executors[id.getId()]->execute(std::move(task));
assert(!rejectedTask);
}
void
SequencedTaskExecutor::sync_all() {
wakeup();
- for (auto &executor : *_executors) {
+ for (auto &executor : _executors) {
executor->sync();
}
}
@@ -92,7 +94,7 @@ SequencedTaskExecutor::sync_all() {
void
SequencedTaskExecutor::wakeup() {
if (_lazyExecutors) {
- for (auto &executor : *_executors) {
+ for (auto &executor : _executors) {
//Enforce parallel wakeup of napping executors.
executor->wakeup();
}
@@ -103,7 +105,7 @@ ExecutorStats
SequencedTaskExecutor::getStats()
{
ExecutorStats accumulatedStats;
- for (auto &executor :* _executors) {
+ for (auto &executor : _executors) {
accumulatedStats.aggregate(executor->getStats());
}
return accumulatedStats;
@@ -111,15 +113,40 @@ SequencedTaskExecutor::getStats()
ISequencedTaskExecutor::ExecutorId
SequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
- uint32_t shrunkId = componentId % _component2Id.size();
- uint8_t executorId = _component2Id[shrunkId];
+ PerfectKeyT key = componentId;
+ auto found = std::find(_component2IdPerfect.begin(), _component2IdPerfect.end(), key);
+ if (found != _component2IdPerfect.end()) {
+ return ExecutorId((found - _component2IdPerfect.begin()) % getNumExecutors());
+ } else if (_component2IdPerfect.size() < _component2IdPerfect.capacity()) {
+ return getExecutorIdPerfect(componentId);
+ } else {
+ return getExecutorIdImPerfect(componentId);
+ }
+}
+
+ISequencedTaskExecutor::ExecutorId
+SequencedTaskExecutor::getExecutorIdPerfect(uint64_t componentId) const {
+ PerfectKeyT key = componentId;
+ std::lock_guard guard(_mutex);
+ auto found = std::find(_component2IdPerfect.begin(), _component2IdPerfect.end(), key);
+ if (found == _component2IdPerfect.end()) {
+ _component2IdPerfect.push_back(key);
+ found = _component2IdPerfect.end() - 1;
+ }
+ return ExecutorId((found - _component2IdPerfect.begin()) % getNumExecutors());
+}
+
+ISequencedTaskExecutor::ExecutorId
+SequencedTaskExecutor::getExecutorIdImPerfect(uint64_t componentId) const {
+ uint32_t shrunkId = componentId % _component2IdImperfect.size();
+ uint8_t executorId = _component2IdImperfect[shrunkId];
if (executorId == MAGIC) {
std::lock_guard guard(_mutex);
- if (_component2Id[shrunkId] == MAGIC) {
- _component2Id[shrunkId] = _nextId % getNumExecutors();
+ if (_component2IdImperfect[shrunkId] == MAGIC) {
+ _component2IdImperfect[shrunkId] = _nextId % getNumExecutors();
_nextId++;
}
- executorId = _component2Id[shrunkId];
+ executorId = _component2IdImperfect[shrunkId];
}
return ExecutorId(executorId);
}
@@ -127,10 +154,10 @@ SequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
const vespalib::SyncableThreadExecutor*
SequencedTaskExecutor::first_executor() const
{
- if (_executors->empty()) {
+ if (_executors.empty()) {
return nullptr;
}
- return _executors->front().get();
+ return _executors.front().get();
}
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 245d6d29780..660bc74472b 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -38,18 +38,22 @@ public:
/**
* For testing only
*/
- uint32_t getComponentHashSize() const { return _component2Id.size(); }
+ uint32_t getComponentHashSize() const { return _component2IdImperfect.size(); }
uint32_t getComponentEffectiveHashSize() const { return _nextId; }
const vespalib::SyncableThreadExecutor* first_executor() const;
private:
- explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
-
- std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
- const bool _lazyExecutors;
- mutable std::vector<uint8_t> _component2Id;
- mutable std::mutex _mutex;
- mutable uint32_t _nextId;
+ explicit SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executor);
+ ExecutorId getExecutorIdPerfect(uint64_t componentId) const;
+ ExecutorId getExecutorIdImPerfect(uint64_t componentId) const;
+
+ std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> _executors;
+ using PerfectKeyT = uint16_t;
+ const bool _lazyExecutors;
+ mutable std::vector<PerfectKeyT> _component2IdPerfect;
+ mutable std::vector<uint8_t> _component2IdImperfect;
+ mutable std::mutex _mutex;
+ mutable uint32_t _nextId;
};