aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2017-05-09 15:44:01 +0200
committerGitHub <noreply@github.com>2017-05-09 15:44:01 +0200
commitc7efb4ecdde276e192e813bf52956cf6a0ca9c9c (patch)
tree0ce748833cd76f6fef9bffe4a479f3ac86da560f /searchlib
parent39c689d83745455d7105da09500eb7232fde45cd (diff)
parente97f8c7eef48f3e5c206ef01e745601ceff8bd28 (diff)
Merge pull request #2377 from yahoo/toregge/replace-task-id-with-executor-id-and-component-id-in-sequenced-task-executor
Toregge/replace task id with executor id and component id in sequenced task executor
Diffstat (limited to 'searchlib')
-rw-r--r--searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp63
-rw-r--r--searchlib/src/vespa/searchlib/common/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp28
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h11
-rw-r--r--searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h52
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp17
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h7
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp53
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h29
9 files changed, 205 insertions, 56 deletions
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 98436364ea0..e8183319fd4 100644
--- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -82,7 +82,7 @@ TEST_F("testExecute", Fixture) {
}
-TEST_F("require that task with same id are serialized", Fixture)
+TEST_F("require that task with same component id are serialized", Fixture)
{
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
@@ -96,7 +96,7 @@ TEST_F("require that task with same id are serialized", Fixture)
EXPECT_EQUAL(42, tv->_val);
}
-TEST_F("require that task with different ids are not serialized", Fixture)
+TEST_F("require that task with different component ids are not serialized", Fixture)
{
int tryCnt = 0;
for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
@@ -119,7 +119,7 @@ TEST_F("require that task with different ids are not serialized", Fixture)
}
-TEST_F("require that task with same string id are serialized", Fixture)
+TEST_F("require that task with same string component id are serialized", Fixture)
{
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
@@ -134,15 +134,17 @@ TEST_F("require that task with same string id are serialized", Fixture)
EXPECT_EQUAL(42, tv->_val);
}
-TEST_F("require that task with different string ids are not serialized",
- Fixture)
+namespace
+{
+
+int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
{
int tryCnt = 0;
- for (tryCnt = 0; tryCnt < 100; ++tryCnt) {
+ for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
EXPECT_EQUAL(0, tv->_val);
f._threads.execute("0", [=]() { usleep(2000); tv->modify(0, 14); });
- f._threads.execute("2", [=]() { tv->modify(14, 42); });
+ f._threads.execute(altComponentId, [=]() { tv->modify(14, 42); });
tv->wait(2);
if (tv->_fail != 1) {
continue;
@@ -154,10 +156,44 @@ TEST_F("require that task with different string ids are not serialized",
EXPECT_EQUAL(14, tv->_val);
break;
}
+ return tryCnt;
+}
+
+vespalib::string makeAltComponentId(Fixture &f)
+{
+ int tryCnt = 0;
+ char altComponentId[20];
+ uint32_t executorId0 = f._threads.getExecutorId("0");
+ for (tryCnt = 1; tryCnt < 100; ++tryCnt) {
+ sprintf(altComponentId, "%d", tryCnt);
+ if (f._threads.getExecutorId(altComponentId) == executorId0) {
+ break;
+ }
+ }
+ EXPECT_TRUE(tryCnt < 100);
+ return altComponentId;
+}
+
+}
+
+TEST_F("require that task with different string component ids are not serialized",
+ Fixture)
+{
+ int tryCnt = detectSerializeFailure(f, "2", 100);
EXPECT_TRUE(tryCnt < 100);
}
+TEST_F("require that task with different string component ids mapping to the same executor id are serialized",
+ Fixture)
+{
+ vespalib::string altComponentId = makeAltComponentId(f);
+ LOG(info, "second string component id is \"%s\"", altComponentId.c_str());
+ int tryCnt = detectSerializeFailure(f, altComponentId, 100);
+ EXPECT_TRUE(tryCnt == 100);
+}
+
+
TEST_F("require that execute works with const lambda", Fixture)
{
int i = 5;
@@ -187,6 +223,19 @@ TEST_F("require that execute works with reference to lambda", Fixture)
EXPECT_EQUAL(5, i);
}
+TEST_F("require that executeLambda works", Fixture)
+{
+ int i = 5;
+ std::vector<int> res;
+ const auto lambda = [i, &res]() mutable
+ { res.push_back(i--); res.push_back(i--); };
+ f._threads.executeLambda(0, lambda);
+ f._threads.sync();
+ std::vector<int> exp({5, 4});
+ EXPECT_EQUAL(exp, res);
+ EXPECT_EQUAL(5, i);
+}
+
} // namespace common
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
index 0399e0ddc5c..fd369581a80 100644
--- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
@@ -23,6 +23,7 @@ vespa_add_library(searchlib_common OBJECT
rcuvector.cpp
resultset.cpp
sequencedtaskexecutor.cpp
+ sequencedtaskexecutorobserver.cpp
serialnumfileheadercontext.cpp
sort.cpp
sortdata.cpp
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
index 2c50f20df30..595c91f9637 100644
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
@@ -12,15 +12,14 @@ using vespalib::ThreadStackExecutor;
namespace search
{
-namespace
+ForegroundTaskExecutor::ForegroundTaskExecutor()
+ : ForegroundTaskExecutor(1)
{
-
-constexpr uint32_t stackSize = 128 * 1024;
-
}
-
-ForegroundTaskExecutor::ForegroundTaskExecutor()
+ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads)
+ : _threads(threads),
+ _ids()
{
}
@@ -28,12 +27,23 @@ ForegroundTaskExecutor::~ForegroundTaskExecutor()
{
}
+uint32_t
+ForegroundTaskExecutor::getExecutorId(uint64_t componentId)
+{
+ auto itr = _ids.find(componentId);
+ if (itr == _ids.end()) {
+ auto insarg = std::make_pair(componentId, _ids.size() % _threads);
+ auto insres = _ids.insert(insarg);
+ assert(insres.second);
+ itr = insres.first;
+ }
+ return itr->second;
+}
void
-ForegroundTaskExecutor::executeTask(uint64_t id,
- vespalib::Executor::Task::UP task)
+ForegroundTaskExecutor::executeTask(uint32_t executorId, vespalib::Executor::Task::UP task)
{
- (void) id;
+ assert(executorId < _threads);
task->run();
}
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
index ee481f5e496..cb3745de501 100644
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h
@@ -2,6 +2,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/stllike/hash_map.h>
namespace vespalib
{
@@ -21,13 +22,19 @@ namespace search
*/
class ForegroundTaskExecutor : public ISequencedTaskExecutor
{
+ const uint32_t _threads;
+ vespalib::hash_map<size_t, uint32_t> _ids;
public:
+ using ISequencedTaskExecutor::getExecutorId;
+
ForegroundTaskExecutor();
+ ForegroundTaskExecutor(uint32_t threads);
~ForegroundTaskExecutor();
- virtual void executeTask(uint64_t id,
- vespalib::Executor::Task::UP task) override;
+ virtual uint32_t getExecutorId(uint64_t componentId) override;
+
+ virtual void executeTask(uint32_t executorId, vespalib::Executor::Task::UP task) override;
virtual void sync() override;
};
diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
index f978cb30ff5..f1c0073dd45 100644
--- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h
@@ -18,16 +18,41 @@ public:
virtual ~ISequencedTaskExecutor() { }
/**
+ * Calculate which executor will handle an component. All callers
+ * must be in the same thread.
+ *
+ * @param componentId component id
+ * @return executor id
+ */
+ virtual uint32_t getExecutorId(uint64_t componentId) = 0;
+
+ inline uint32_t getExecutorId(vespalib::stringref componentId) {
+ vespalib::hash<vespalib::stringref> hashfun;
+ return getExecutorId(hashfun(componentId));
+ }
+
+ /**
* Schedule a task to run after all previously scheduled tasks with
- * same id. All tasks must be scheduled from same thread.
+ * same id.
*
- * @param id task id.
+ * @param executorId which internal executor to use
* @param task unique pointer to the task to be executed
*/
- virtual void executeTask(uint64_t id,
- vespalib::Executor::Task::UP task) = 0;
+ virtual void executeTask(uint32_t exeucutorId, vespalib::Executor::Task::UP task) = 0;
/**
+ * Wrap lambda function into a task and schedule it to be run.
+ * Caller must ensure that pointers and references are valid and
+ * call sync before tearing down pointed to/referenced data.
+ *
+ * @param executorId which internal executor to use
+ * @param function function to be wrapped in a task and later executed
+ */
+ template <class FunctionType>
+ inline void executeLambda(uint32_t executorId, FunctionType &&function) {
+ executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function)));
+ }
+ /**
* Wait for all scheduled tasks to complete.
*/
virtual void sync() = 0;
@@ -38,12 +63,13 @@ public:
* call sync before tearing down pointed to/referenced data.
* All tasks must be scheduled from same thread.
*
- * @param id task id.
- * @param function function to be wrapped in a task and later executed
+ * @param componentId component id
+ * @param function function to be wrapped in a task and later executed
*/
template <class FunctionType>
- inline void execute(uint64_t id, FunctionType &&function) {
- executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
+ inline void execute(uint64_t componentId, FunctionType &&function) {
+ uint32_t executorId = getExecutorId(componentId);
+ executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function)));
}
/**
@@ -52,13 +78,13 @@ public:
* call sync before tearing down pointed to/referenced data.
* All tasks must be scheduled from same thread.
*
- * @param id task id.
- * @param function function to be wrapped in a task and later executed
+ * @param componentId component id
+ * @param function function to be wrapped in a task and later executed
*/
template <class FunctionType>
- inline void execute(const vespalib::stringref id, FunctionType &&function) {
- vespalib::hash<vespalib::stringref> hashfun;
- executeTask(hashfun(id),
+ inline void execute(vespalib::stringref componentId, FunctionType &&function) {
+ uint32_t executorId = getExecutorId(componentId);
+ executeTask(executorId,
makeLambdaTask(std::forward<FunctionType>(function)));
}
};
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
index 00398ef684d..062853f1275 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
@@ -35,18 +35,23 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit)
}
}
-void
-SequencedTaskExecutor::executeTask(uint64_t id,
- vespalib::Executor::Task::UP task)
+uint32_t
+SequencedTaskExecutor::getExecutorId(uint64_t componentId)
{
- auto itr = _ids.find(id);
+ auto itr = _ids.find(componentId);
if (itr == _ids.end()) {
- auto insarg = std::make_pair(id, _ids.size() % _executors.size());
+ auto insarg = std::make_pair(componentId, _ids.size() % _executors.size());
auto insres = _ids.insert(insarg);
assert(insres.second);
itr = insres.first;
}
- size_t executorId = itr->second;
+ return itr->second;
+}
+
+void
+SequencedTaskExecutor::executeTask(uint32_t executorId, vespalib::Executor::Task::UP task)
+{
+ assert(executorId < _executors.size());
vespalib::ThreadStackExecutorBase &executor(*_executors[executorId]);
auto rejectedTask = executor.execute(std::move(task));
assert(!rejectedTask);
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
index 4f20c5b14a4..acfec837309 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h
@@ -24,14 +24,17 @@ class SequencedTaskExecutor : public ISequencedTaskExecutor
std::vector<std::shared_ptr<vespalib::BlockingThreadStackExecutor>> _executors;
vespalib::hash_map<size_t, size_t> _ids;
public:
+ using ISequencedTaskExecutor::getExecutorId;
+
SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit = 1000);
~SequencedTaskExecutor();
void setTaskLimit(uint32_t taskLimit);
- virtual void executeTask(uint64_t id,
- vespalib::Executor::Task::UP task) override;
+ virtual uint32_t getExecutorId(uint64_t componentId) override;
+
+ virtual void executeTask(uint32_t executorId, vespalib::Executor::Task::UP task) override;
virtual void sync() override;
};
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
new file mode 100644
index 00000000000..b0a8fb05e41
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp
@@ -0,0 +1,53 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "sequencedtaskexecutorobserver.h"
+
+namespace search
+{
+
+SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor)
+ : _executor(executor),
+ _executeCnt(0u),
+ _syncCnt(0u),
+ _executeHistory(),
+ _mutex()
+{
+}
+
+SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver()
+{
+}
+
+uint32_t
+SequencedTaskExecutorObserver::getExecutorId(uint64_t componentId)
+{
+ return _executor.getExecutorId(componentId);
+}
+
+void
+SequencedTaskExecutorObserver::executeTask(uint32_t executorId,
+ vespalib::Executor::Task::UP task)
+{
+ ++_executeCnt;
+ {
+ std::lock_guard<std::mutex> guard(_mutex);
+ _executeHistory.emplace_back(executorId);
+ }
+ _executor.executeTask(executorId, std::move(task));
+}
+
+void
+SequencedTaskExecutorObserver::sync()
+{
+ ++_syncCnt;
+ _executor.sync();
+}
+
+std::vector<uint32_t>
+SequencedTaskExecutorObserver::getExecuteHistory()
+{
+ std::lock_guard<std::mutex> guard(_mutex);
+ return _executeHistory;
+}
+
+} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
index ffc6ba7f55b..12a84d0bd81 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h
@@ -3,6 +3,8 @@
#include "isequencedtaskexecutor.h"
#include <atomic>
+#include <vector>
+#include <mutex>
namespace search
{
@@ -16,29 +18,22 @@ class SequencedTaskExecutorObserver : public ISequencedTaskExecutor
ISequencedTaskExecutor &_executor;
std::atomic<uint32_t> _executeCnt;
std::atomic<uint32_t> _syncCnt;
+ std::vector<uint32_t> _executeHistory;
+ std::mutex _mutex;
public:
- SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor)
- : _executor(executor),
- _executeCnt(0u),
- _syncCnt(0u)
- {
- }
+ using ISequencedTaskExecutor::getExecutorId;
- virtual ~SequencedTaskExecutorObserver() { }
+ SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor);
- virtual void executeTask(uint64_t id,
- vespalib::Executor::Task::UP task) override {
- ++_executeCnt;
- _executor.executeTask(id, std::move(task));
- }
-
- virtual void sync() override {
- ++_syncCnt;
- _executor.sync();
- }
+ virtual ~SequencedTaskExecutorObserver() override;
+ virtual uint32_t getExecutorId(uint64_t componentId) override;
+ virtual void executeTask(uint32_t executorId,
+ vespalib::Executor::Task::UP task) override;
+ virtual void sync() override;
uint32_t getExecuteCnt() const { return _executeCnt; }
uint32_t getSyncCnt() const { return _syncCnt; }
+ std::vector<uint32_t> getExecuteHistory();
};
} // namespace search