aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-16 09:32:15 +0100
committerGitHub <noreply@github.com>2021-11-16 09:32:15 +0100
commitc32200f74e6292cd939234778fa09ae408eb47b3 (patch)
tree617f0fffc7abbbde4964c177ca2ea1a6975d71f5 /staging_vespalib
parent0f24b425434161d031f37b3665b9b3bad0af3f18 (diff)
parent9e1de33fc99ea60f4b3f43356d0815ff209f049e (diff)
Merge pull request #20001 from vespa-engine/balder/perfect-with-few-sane-with-many
Add a fixed size table of 8 * num_exutors with 16 bit entries. Use th…
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp85
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h20
3 files changed, 84 insertions, 32 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 647bcf9bcee..4dded5d90ce 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -242,11 +242,16 @@ TEST("require that you get correct number of executors") {
TEST("require that you distribute well") {
auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven);
+ const uint32_t NUM_EXACT = 8 * seven->getNumExecutors();
EXPECT_EQUAL(7u, seven->getNumExecutors());
EXPECT_EQUAL(97u, seq.getComponentHashSize());
EXPECT_EQUAL(0u, seq.getComponentEffectiveHashSize());
for (uint32_t id=0; id < 1000; id++) {
- EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId());
+ if (id < NUM_EXACT) {
+ EXPECT_EQUAL(id % seven->getNumExecutors(), seven->getExecutorId(id).getId());
+ } else {
+ EXPECT_EQUAL(((id - NUM_EXACT) % 97) % seven->getNumExecutors(), seven->getExecutorId(id).getId());
+ }
}
EXPECT_EQUAL(97u, seq.getComponentHashSize());
EXPECT_EQUAL(97u, seq.getComponentEffectiveHashSize());
@@ -272,8 +277,8 @@ TEST("require that similar names gets 7/8 unique ids with 8 executors") {
EXPECT_EQUAL(3u, four->getExecutorIdFromName("f4").getId());
EXPECT_EQUAL(4u, four->getExecutorIdFromName("f5").getId());
EXPECT_EQUAL(5u, four->getExecutorIdFromName("f6").getId());
- EXPECT_EQUAL(2u, four->getExecutorIdFromName("f7").getId());
- EXPECT_EQUAL(6u, four->getExecutorIdFromName("f8").getId());
+ EXPECT_EQUAL(6u, four->getExecutorIdFromName("f7").getId());
+ EXPECT_EQUAL(7u, four->getExecutorIdFromName("f8").getId());
}
TEST("Test creation of different types") {
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index f92e1655e7d..954a63978f3 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -14,6 +14,8 @@ namespace {
constexpr uint32_t stackSize = 128_Ki;
constexpr uint8_t MAGIC = 255;
+constexpr uint32_t NUM_PERFECT_PER_EXECUTOR = 8;
+constexpr uint16_t INVALID_KEY = 0x8000;
bool
isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & executors) {
@@ -25,6 +27,16 @@ isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & ex
return true;
}
+ssize_t
+find(uint16_t key, const uint16_t values[], size_t numValues) {
+ for (size_t i(0); i < numValues; i++) {
+ if (key == values[i]) return i;
+ if (INVALID_KEY == values[i]) return -1;
+ }
+ return -1;
+}
+
+
}
std::unique_ptr<ISequencedTaskExecutor>
@@ -35,14 +47,14 @@ SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t thre
size_t num_strands = std::min(taskLimit, threads*32);
return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit);
} else {
- auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>();
- executors->reserve(threads);
+ auto executors = std::vector<std::unique_ptr<SyncableThreadExecutor>>();
+ executors.reserve(threads);
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 2 : kindOfWatermark;
- executors->push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
} else {
- executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
+ executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
}
return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
@@ -54,21 +66,26 @@ SequencedTaskExecutor::~SequencedTaskExecutor()
sync_all();
}
-SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
- : ISequencedTaskExecutor(executors->size()),
+SequencedTaskExecutor::SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executors)
+ : ISequencedTaskExecutor(executors.size()),
_executors(std::move(executors)),
- _lazyExecutors(isLazy(*_executors)),
- _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC),
+ _lazyExecutors(isLazy(_executors)),
+ _component2IdPerfect(std::make_unique<PerfectKeyT[]>(getNumExecutors()*NUM_PERFECT_PER_EXECUTOR)),
+ _component2IdImperfect(vespalib::hashtable_base::getModuloStl(getNumExecutors()*NUM_PERFECT_PER_EXECUTOR), MAGIC),
_mutex(),
_nextId(0)
{
assert(getNumExecutors() < 256);
+
+ for (size_t i(0); i < getNumExecutors() * NUM_PERFECT_PER_EXECUTOR; i++) {
+ _component2IdPerfect[i] = INVALID_KEY;
+ }
}
void
SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
{
- for (const auto &executor : *_executors) {
+ for (const auto &executor : _executors) {
executor->setTaskLimit(taskLimit);
}
}
@@ -76,15 +93,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
void
SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
{
- assert(id.getId() < _executors->size());
- auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task));
+ assert(id.getId() < _executors.size());
+ auto rejectedTask = _executors[id.getId()]->execute(std::move(task));
assert(!rejectedTask);
}
void
SequencedTaskExecutor::sync_all() {
wakeup();
- for (auto &executor : *_executors) {
+ for (auto &executor : _executors) {
executor->sync();
}
}
@@ -92,7 +109,7 @@ SequencedTaskExecutor::sync_all() {
void
SequencedTaskExecutor::wakeup() {
if (_lazyExecutors) {
- for (auto &executor : *_executors) {
+ for (auto &executor : _executors) {
//Enforce parallel wakeup of napping executors.
executor->wakeup();
}
@@ -103,7 +120,7 @@ ExecutorStats
SequencedTaskExecutor::getStats()
{
ExecutorStats accumulatedStats;
- for (auto &executor :* _executors) {
+ for (auto &executor : _executors) {
accumulatedStats.aggregate(executor->getStats());
}
return accumulatedStats;
@@ -111,15 +128,41 @@ SequencedTaskExecutor::getStats()
ISequencedTaskExecutor::ExecutorId
SequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
- uint32_t shrunkId = componentId % _component2Id.size();
- uint8_t executorId = _component2Id[shrunkId];
+ auto id = getExecutorIdPerfect(componentId);
+ return id ? id.value() : getExecutorIdImPerfect(componentId);
+}
+
+std::optional<ISequencedTaskExecutor::ExecutorId>
+SequencedTaskExecutor::getExecutorIdPerfect(uint64_t componentId) const {
+ PerfectKeyT key = componentId & 0x7fff;
+ ssize_t pos = find(key, _component2IdPerfect.get(), getNumExecutors() * NUM_PERFECT_PER_EXECUTOR);
+ if (pos < 0) {
+ std::unique_lock guard(_mutex);
+ pos = find(key, _component2IdPerfect.get(), getNumExecutors() * NUM_PERFECT_PER_EXECUTOR);
+ if (pos < 0) {
+ pos = find(INVALID_KEY, _component2IdPerfect.get(), getNumExecutors() * NUM_PERFECT_PER_EXECUTOR);
+ if (pos >= 0) {
+ _component2IdPerfect[pos] = key;
+ } else {
+ // There was a race for the last spots
+ return std::optional<ISequencedTaskExecutor::ExecutorId>();
+ }
+ }
+ }
+ return std::optional<ISequencedTaskExecutor::ExecutorId>(ExecutorId(pos % getNumExecutors()));
+}
+
+ISequencedTaskExecutor::ExecutorId
+SequencedTaskExecutor::getExecutorIdImPerfect(uint64_t componentId) const {
+ uint32_t shrunkId = componentId % _component2IdImperfect.size();
+ uint8_t executorId = _component2IdImperfect[shrunkId];
if (executorId == MAGIC) {
std::lock_guard guard(_mutex);
- if (_component2Id[shrunkId] == MAGIC) {
- _component2Id[shrunkId] = _nextId % getNumExecutors();
+ if (_component2IdImperfect[shrunkId] == MAGIC) {
+ _component2IdImperfect[shrunkId] = _nextId % getNumExecutors();
_nextId++;
}
- executorId = _component2Id[shrunkId];
+ executorId = _component2IdImperfect[shrunkId];
}
return ExecutorId(executorId);
}
@@ -127,10 +170,10 @@ SequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
const vespalib::SyncableThreadExecutor*
SequencedTaskExecutor::first_executor() const
{
- if (_executors->empty()) {
+ if (_executors.empty()) {
return nullptr;
}
- return _executors->front().get();
+ return _executors.front().get();
}
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 245d6d29780..06e7fa65ac2 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -38,18 +38,22 @@ public:
/**
* For testing only
*/
- uint32_t getComponentHashSize() const { return _component2Id.size(); }
+ uint32_t getComponentHashSize() const { return _component2IdImperfect.size(); }
uint32_t getComponentEffectiveHashSize() const { return _nextId; }
const vespalib::SyncableThreadExecutor* first_executor() const;
private:
- explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
-
- std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
- const bool _lazyExecutors;
- mutable std::vector<uint8_t> _component2Id;
- mutable std::mutex _mutex;
- mutable uint32_t _nextId;
+ explicit SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executor);
+ std::optional<ExecutorId> getExecutorIdPerfect(uint64_t componentId) const;
+ ExecutorId getExecutorIdImPerfect(uint64_t componentId) const;
+
+ std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> _executors;
+ using PerfectKeyT = uint16_t;
+ const bool _lazyExecutors;
+ mutable std::unique_ptr<PerfectKeyT[]> _component2IdPerfect;
+ mutable std::vector<uint8_t> _component2IdImperfect;
+ mutable std::mutex _mutex;
+ mutable uint32_t _nextId;
};