summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-06-26 10:13:45 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-06-26 10:13:45 +0000
commit7e20842be3eec0f0881749098a2c944e5a1a258e (patch)
tree304bb5e1f5bcd785d33ad6fbf89d20632454b691 /staging_vespalib
parentbb367946be112361611f62fb6803c5060cfe9dde (diff)
Let getExecutorId be a pure interface.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp9
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp5
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h3
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp27
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp24
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h19
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp5
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h7
9 files changed, 60 insertions, 50 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 6128386837d..df94e70f9d6 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -239,14 +239,15 @@ TEST("require that you get correct number of executors") {
TEST("require that you distribute well") {
auto seven = SequencedTaskExecutor::create(7);
+ const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven);
EXPECT_EQUAL(7u, seven->getNumExecutors());
- EXPECT_EQUAL(97u, seven->getComponentHashSize());
- EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize());
+ EXPECT_EQUAL(97u, seq.getComponentHashSize());
+ EXPECT_EQUAL(0u, seq.getComponentEffectiveHashSize());
for (uint32_t id=0; id < 1000; id++) {
EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId());
}
- EXPECT_EQUAL(97u, seven->getComponentHashSize());
- EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize());
+ EXPECT_EQUAL(97u, seq.getComponentHashSize());
+ EXPECT_EQUAL(97u, seq.getComponentEffectiveHashSize());
}
TEST("Test creation of different types") {
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
index b45ada1c58c..f295d2b30c1 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
@@ -39,4 +39,9 @@ vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0);
}
+ISequencedTaskExecutor::ExecutorId
+ForegroundTaskExecutor::getExecutorId(uint64_t componentId) const {
+ return ExecutorId(componentId%getNumExecutors());
+}
+
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
index d9a348ed012..f7b3ff8eab0 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
@@ -21,11 +21,10 @@ public:
ForegroundTaskExecutor(uint32_t threads);
~ForegroundTaskExecutor() override;
+ ExecutorId getExecutorId(uint64_t componentId) const override;
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
void sync() override;
-
void setTaskLimit(uint32_t taskLimit) override;
-
vespalib::ExecutorStats getStats() override;
private:
std::atomic<uint64_t> _accepted;
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
index f8f1f64fac5..af3ce5fe64f 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
@@ -2,22 +2,12 @@
#include "isequencedtaskexecutor.h"
#include <vespa/vespalib/stllike/hash_fun.h>
-#include <vespa/vespalib/stllike/hashtable.h>
-#include <cassert>
namespace vespalib {
-namespace {
- constexpr uint8_t MAGIC = 255;
-}
-
ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors)
- : _component2Id(vespalib::hashtable_base::getModuloStl(numExecutors*8), MAGIC),
- _mutex(),
- _numExecutors(numExecutors),
- _nextId(0)
+ : _numExecutors(numExecutors)
{
- assert(numExecutors < 256);
}
ISequencedTaskExecutor::~ISequencedTaskExecutor() = default;
@@ -28,19 +18,4 @@ ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) c
return getExecutorId(hashfun(componentId));
}
-ISequencedTaskExecutor::ExecutorId
-ISequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
- uint32_t shrunkId = componentId % _component2Id.size();
- uint8_t executorId = _component2Id[shrunkId];
- if (executorId == MAGIC) {
- std::lock_guard guard(_mutex);
- if (_component2Id[shrunkId] == MAGIC) {
- _component2Id[shrunkId] = _nextId % getNumExecutors();
- _nextId++;
- }
- executorId = _component2Id[shrunkId];
- }
- return ExecutorId(executorId);
-}
-
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index 034e1520b8d..d457de26f54 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -37,7 +37,7 @@ public:
* @param componentId component id
* @return executor id
*/
- virtual ExecutorId getExecutorId(uint64_t componentId) const;
+ virtual ExecutorId getExecutorId(uint64_t componentId) const = 0;
uint32_t getNumExecutors() const { return _numExecutors; }
ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const;
@@ -98,16 +98,9 @@ public:
void execute(ExecutorId id, FunctionType &&function) {
executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
}
- /**
- * For testing only
- */
- uint32_t getComponentHashSize() const { return _component2Id.size(); }
- uint32_t getComponentEffectiveHashSize() const { return _nextId; }
+
private:
- mutable std::vector<uint8_t> _component2Id;
- mutable std::mutex _mutex;
uint32_t _numExecutors;
- mutable uint32_t _nextId;
};
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 0fd78d8dcf6..963264a62e7 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -4,12 +4,15 @@
#include "adaptive_sequenced_executor.h"
#include "singleexecutor.h"
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/stllike/hashtable.h>
+#include <cassert>
namespace vespalib {
namespace {
constexpr uint32_t stackSize = 128 * 1024;
+constexpr uint8_t MAGIC = 255;
}
@@ -42,8 +45,12 @@ SequencedTaskExecutor::~SequencedTaskExecutor()
SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors)
: ISequencedTaskExecutor(executors->size()),
- _executors(std::move(executors))
+ _executors(std::move(executors)),
+ _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC),
+ _mutex(),
+ _nextId(0)
{
+ assert(getNumExecutors() < 256);
}
void
@@ -87,4 +94,19 @@ SequencedTaskExecutor::getStats()
return accumulatedStats;
}
+ISequencedTaskExecutor::ExecutorId
+SequencedTaskExecutor::getExecutorId(uint64_t componentId) const {
+ uint32_t shrunkId = componentId % _component2Id.size();
+ uint8_t executorId = _component2Id[shrunkId];
+ if (executorId == MAGIC) {
+ std::lock_guard guard(_mutex);
+ if (_component2Id[shrunkId] == MAGIC) {
+ _component2Id[shrunkId] = _nextId % getNumExecutors();
+ _nextId++;
+ }
+ executorId = _component2Id[shrunkId];
+ }
+ return ExecutorId(executorId);
+}
+
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index b3dd400478a..c37bd2eecf4 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -14,11 +14,8 @@ class SyncableThreadExecutor;
*/
class SequencedTaskExecutor final : public ISequencedTaskExecutor
{
- using Stats = vespalib::ExecutorStats;
- std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
-
- SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
public:
+ using Stats = vespalib::ExecutorStats;
using ISequencedTaskExecutor::getExecutorId;
using OptimizeFor = vespalib::Executor::OptimizeFor;
@@ -26,6 +23,7 @@ public:
void setTaskLimit(uint32_t taskLimit) override;
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ ExecutorId getExecutorId(uint64_t componentId) const override;
void sync() override;
Stats getStats() override;
@@ -35,6 +33,19 @@ public:
*/
static std::unique_ptr<ISequencedTaskExecutor>
create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms);
+ /**
+ * For testing only
+ */
+ uint32_t getComponentHashSize() const { return _component2Id.size(); }
+ uint32_t getComponentEffectiveHashSize() const { return _nextId; }
+private:
+ SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor);
+
+ std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors;
+ mutable std::vector<uint8_t> _component2Id;
+ mutable std::mutex _mutex;
+ mutable uint32_t _nextId;
+
};
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
index 3d9ed4e21f4..d6a89117d68 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
@@ -49,4 +49,9 @@ vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() {
return _executor.getStats();
}
+ISequencedTaskExecutor::ExecutorId
+SequencedTaskExecutorObserver::getExecutorId(uint64_t componentId) const {
+ return _executor.getExecutorId(componentId);
+}
+
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
index 9307a7ddb37..6bcdf08ae5c 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
@@ -23,16 +23,15 @@ public:
SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor);
~SequencedTaskExecutorObserver() override;
+ ExecutorId getExecutorId(uint64_t componentId) const override;
void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
void sync() override;
+ void setTaskLimit(uint32_t taskLimit) override;
+ vespalib::ExecutorStats getStats() override;
uint32_t getExecuteCnt() const { return _executeCnt; }
uint32_t getSyncCnt() const { return _syncCnt; }
std::vector<uint32_t> getExecuteHistory();
-
- void setTaskLimit(uint32_t taskLimit) override;
-
- vespalib::ExecutorStats getStats() override;
};
} // namespace search