summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp22
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp14
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp5
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h1
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h31
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp3
8 files changed, 60 insertions, 22 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
index 10f3f6089e3..2ca49105610 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
@@ -63,6 +63,8 @@ public:
}
};
+vespalib::stringref ZERO("0");
+
TEST_F("testExecute", Fixture) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
@@ -97,7 +99,7 @@ TEST_F("require that task with different component ids are not serialized", Fixt
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(2, [&]() { tv->modify(14, 42); });
+ f._threads.execute(1, [&]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
@@ -118,8 +120,8 @@ TEST_F("require that task with same string component id are serialized", Fixture
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
auto test2 = [&]() { tv->modify(14, 42); };
- f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId("0"), test2);
+ f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(f._threads.getExecutorIdFromName(ZERO), test2);
tv->wait(2);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
@@ -136,8 +138,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); });
+ f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); });
+ f._threads.execute(f._threads.getExecutorIdFromName(altComponentId), [&]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
@@ -156,10 +158,10 @@ vespalib::string makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
- ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0");
+ ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorIdFromName(ZERO);
for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
sprintf(altComponentId, "%d", tryCnt);
- if (f._threads.getExecutorId(altComponentId) == executorId0) {
+ if (f._threads.getExecutorIdFromName(altComponentId) == executorId0) {
break;
}
}
@@ -236,13 +238,9 @@ TEST("require that you get correct number of executors") {
TEST("require that you distribute well") {
AdaptiveSequencedExecutor seven(7, 1, 0, 10);
EXPECT_EQUAL(7u, seven.getNumExecutors());
- EXPECT_EQUAL(97u, seven.getComponentHashSize());
- EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize());
for (uint32_t id=0; id < 1000; id++) {
- EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId());
+ EXPECT_EQUAL(id%7, seven.getExecutorId(id).getId());
}
- EXPECT_EQUAL(97u, seven.getComponentHashSize());
- EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize());
}
}
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 70d0f1c743d..6128386837d 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -65,6 +65,8 @@ public:
}
};
+vespalib::stringref ZERO("0");
+
TEST_F("testExecute", Fixture) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
@@ -120,8 +122,8 @@ TEST_F("require that task with same string component id are serialized", Fixture
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
auto test2 = [=]() { tv->modify(14, 42); };
- f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads->execute(f._threads->getExecutorId("0"), test2);
+ f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorIdFromName(ZERO), test2);
tv->wait(2);
EXPECT_EQUAL(0, tv->_fail);
EXPECT_EQUAL(42, tv->_val);
@@ -138,8 +140,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
- f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); });
+ f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); });
+ f._threads->execute(f._threads->getExecutorIdFromName(altComponentId), [=]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
@@ -158,10 +160,10 @@ vespalib::string makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
- ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0");
+ ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorIdFromName(ZERO);
for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
sprintf(altComponentId, "%d", tryCnt);
- if (f._threads->getExecutorId(altComponentId) == executorId0) {
+ if (f._threads->getExecutorIdFromName(altComponentId) == executorId0) {
break;
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 50bc3b020a8..3e87749c794 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -256,6 +256,11 @@ AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor()
assert(_worker_stack.empty());
}
+ISequencedTaskExecutor::ExecutorId
+AdaptiveSequencedExecutor::getExecutorId(uint64_t component) const {
+ return ExecutorId(component % _strands.size());
+}
+
void
AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task)
{
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index bc3457a72ef..a4d3ac97758 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -117,6 +117,7 @@ public:
AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
size_t max_waiting, size_t max_pending);
~AdaptiveSequencedExecutor() override;
+ ExecutorId getExecutorId(uint64_t component) const override;
void executeTask(ExecutorId id, Task::UP task) override;
void sync() override;
void setTaskLimit(uint32_t task_limit) override;
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
new file mode 100644
index 00000000000..575552971fa
--- /dev/null
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -0,0 +1,31 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/threadexecutor.h>
+#include <atomic>
+
+namespace vespalib {
+
+/**
+ * Implementation of the ThreadExecutor interface that runs all tasks in the foreground by the calling thread.
+ */
+class ForegroundThreadExecutor : public vespalib::ThreadExecutor {
+private:
+ std::atomic<size_t> _accepted;
+
+public:
+ ForegroundThreadExecutor() : _accepted(0) { }
+ Task::UP execute(Task::UP task) override {
+ task->run();
+ ++_accepted;
+ return Task::UP();
+ }
+ size_t getNumThreads() const override { return 0; }
+ Stats getStats() override {
+ return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
+ }
+ virtual void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
+};
+
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
index d05702cc85b..f8f1f64fac5 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
@@ -23,7 +23,7 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors)
ISequencedTaskExecutor::~ISequencedTaskExecutor() = default;
ISequencedTaskExecutor::ExecutorId
-ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const {
+ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const {
vespalib::hash<vespalib::stringref> hashfun;
return getExecutorId(hashfun(componentId));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index cd2a6c6f0d8..034e1520b8d 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -37,10 +37,10 @@ public:
* @param componentId component id
* @return executor id
*/
- ExecutorId getExecutorId(uint64_t componentId) const;
+ virtual ExecutorId getExecutorId(uint64_t componentId) const;
uint32_t getNumExecutors() const { return _numExecutors; }
- ExecutorId getExecutorId(vespalib::stringref componentId) const;
+ ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const;
/**
* Schedule a task to run after all previously scheduled tasks with
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index a0c2f0ac237..0fd78d8dcf6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -18,7 +18,8 @@ std::unique_ptr<ISequencedTaskExecutor>
SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
{
if (optimize == OptimizeFor::ADAPTIVE) {
- return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, kindOfWatermark, taskLimit);
+ size_t num_strands = std::min(taskLimit, threads*32);
+ return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit);
} else {
auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>();
executors->reserve(threads);