summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-06-25 09:29:39 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-06-25 09:34:25 +0000
commit8004167e964b1eb12a67777ec9d70d57a5dbbfed (patch)
treeb7282fdac9b62926324ca0f3c19c8d21dd75319f /staging_vespalib
parent0680bf96a4bf17aec0b9fde98ac5369c0991f0fb (diff)
Let the executor create an executor id to its liking.
Taske full advantage over strands instead of being limited to threads.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp14
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp14
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp5
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h1
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp3
7 files changed, 27 insertions, 16 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
index 10f3f6089e3..9c06ecd3c8f 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
@@ -63,6 +63,8 @@ public:
}
};
+vespalib::stringref ZERO("0");
+
TEST_F("testExecute", Fixture) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
@@ -118,8 +120,8 @@ TEST_F("require that task with same string component id are serialized", Fixture
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
auto test2 = [&]() { tv->modify(14, 42); };
- f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId("0"), test2);
+ f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(f._threads.getExecutorIdFromName(ZERO), test2);
tv->wait(2);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
@@ -136,8 +138,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); });
+ f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(f._threads.getExecutorIdFromName(altComponentId), [&]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
@@ -156,10 +158,10 @@ vespalib::string makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
- ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0");
+ ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorIdFromName(ZERO);
for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
sprintf(altComponentId, "%d", tryCnt);
- if (f._threads.getExecutorId(altComponentId) == executorId0) {
+ if (f._threads.getExecutorIdFromName(altComponentId) == executorId0) {
break;
}
}
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 70d0f1c743d..6128386837d 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -65,6 +65,8 @@ public:
}
};
+vespalib::stringref ZERO("0");
+
TEST_F("testExecute", Fixture) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
@@ -120,8 +122,8 @@ TEST_F("require that task with same string component id are serialized", Fixture
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
auto test2 = [=]() { tv->modify(14, 42); };
- f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads->execute(f._threads->getExecutorId("0"), test2);
+ f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorIdFromName(ZERO), test2);
tv->wait(2);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
@@ -138,8 +140,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); });
+ f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorIdFromName(altComponentId), [=]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
@@ -158,10 +160,10 @@ vespalib::string makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
- ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0");
+ ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorIdFromName(ZERO);
for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
sprintf(altComponentId, "%d", tryCnt);
- if (f._threads->getExecutorId(altComponentId) == executorId0) {
+ if (f._threads->getExecutorIdFromName(altComponentId) == executorId0) {
break;
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 50bc3b020a8..3e87749c794 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -256,6 +256,11 @@ AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor()
assert(_worker_stack.empty());
}
+ISequencedTaskExecutor::ExecutorId
+AdaptiveSequencedExecutor::getExecutorId(uint64_t component) const {
+ return ExecutorId(component % _strands.size());
+}
+
void
AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task)
{
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index bc3457a72ef..a4d3ac97758 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -117,6 +117,7 @@ public:
AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
size_t max_waiting, size_t max_pending);
~AdaptiveSequencedExecutor() override;
+ ExecutorId getExecutorId(uint64_t component) const override;
void executeTask(ExecutorId id, Task::UP task) override;
void sync() override;
void setTaskLimit(uint32_t task_limit) override;
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
index d05702cc85b..f8f1f64fac5 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
@@ -23,7 +23,7 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors)
ISequencedTaskExecutor::~ISequencedTaskExecutor() = default;
ISequencedTaskExecutor::ExecutorId
-ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const {
+ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const {
vespalib::hash<vespalib::stringref> hashfun;
return getExecutorId(hashfun(componentId));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index cd2a6c6f0d8..034e1520b8d 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -37,10 +37,10 @@ public:
* @param componentId component id
* @return executor id
*/
- ExecutorId getExecutorId(uint64_t componentId) const;
+ virtual ExecutorId getExecutorId(uint64_t componentId) const;
uint32_t getNumExecutors() const { return _numExecutors; }
- ExecutorId getExecutorId(vespalib::stringref componentId) const;
+ ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const;
/**
* Schedule a task to run after all previously scheduled tasks with
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index a0c2f0ac237..0fd78d8dcf6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -18,7 +18,8 @@ std::unique_ptr<ISequencedTaskExecutor>
SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
{
if (optimize == OptimizeFor::ADAPTIVE) {
- return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, kindOfWatermark, taskLimit);
+ size_t num_strands = std::min(taskLimit, threads*32);
+ return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit);
} else {
auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>();
executors->reserve(threads);