summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-14 14:59:05 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-09-14 14:59:05 +0200
commit66cd305337c44b84589fb91e95d3d9662f8e6c5e (patch)
tree0ee43a40ab1d8a0b2b6e1ba837c97a6bfa011834
parentbeb641c2a972af3c992ce46407297e87c3d8e193 (diff)
Limit the number of task s in flight to 2x number of threads in the pool.
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h10
-rw-r--r--searchlib/src/vespa/searchlib/docstore/filechunk.cpp13
-rw-r--r--searchlib/src/vespa/searchlib/docstore/filechunk.h4
-rw-r--r--vespalib/src/vespa/vespalib/util/threadexecutor.h5
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp4
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h4
7 files changed, 25 insertions, 16 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
index 47e41fcd1db..c938288c714 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
+++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h
@@ -33,6 +33,7 @@ public:
return *this;
}
virtual bool isCurrentThread() const override;
+ size_t getNumThreads() const override { return _executor.getNumThreads(); }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
index a2f0724d396..905ec5ef07b 100644
--- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
+++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h
@@ -3,8 +3,7 @@
#include <vespa/searchcorespi/index/i_thread_service.h>
-namespace proton {
-namespace test {
+namespace proton::test {
class ThreadServiceObserver : public searchcorespi::index::IThreadService
{
@@ -38,9 +37,8 @@ public:
virtual bool isCurrentThread() const override {
return _service.isCurrentThread();
}
-};
-
-} // namespace test
-} // namespace proton
+ size_t getNumThreads() const override { return _service.getNumThreads(); }
+};
+}
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
index c823bacb10c..4fac42c1421 100644
--- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
+++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp
@@ -4,16 +4,16 @@
#include "data_store_file_chunk_stats.h"
#include "summaryexceptions.h"
#include "randreaders.h"
+#include <vespa/searchlib/util/filekit.h>
+#include <vespa/searchlib/common/lambdatask.h>
#include <vespa/vespalib/data/fileheader.h>
#include <vespa/vespalib/data/databuffer.h>
#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/array.hpp>
#include <vespa/vespalib/stllike/hash_map.hpp>
-#include <vespa/searchlib/util/filekit.h>
-#include <vespa/searchlib/common/lambdatask.h>
-#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/fastos/file.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
#include <future>
#include <vespa/log/log.h>
@@ -333,16 +333,15 @@ appendChunks(FixedParams * args, Chunk::UP chunk)
}
-
void
-FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest,
+FileChunk::appendTo(vespalib::ThreadExecutor & executor, const IGetLid & db, IWriteData & dest,
uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress)
{
assert(frozen() || visitorProgress);
vespalib::GenerationHandler::Guard lidReadGuard(db.getLidReadGuard());
assert(numChunks <= getNumChunks());
FixedParams fixedParams = {db, dest, lidReadGuard, getFileId().getId(), visitorProgress};
- vespalib::ThreadStackExecutor singleExecutor(1, 64*1024);
+ vespalib::BlockingThreadStackExecutor singleExecutor(1, 64*1024, executor.getNumThreads()*2);
for (size_t chunkId(0); chunkId < numChunks; chunkId++) {
std::promise<Chunk::UP> promisedChunk;
std::future<Chunk::UP> futureChunk = promisedChunk.get_future();
diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.h b/searchlib/src/vespa/searchlib/docstore/filechunk.h
index a7b6556a0a3..87dc2e018eb 100644
--- a/searchlib/src/vespa/searchlib/docstore/filechunk.h
+++ b/searchlib/src/vespa/searchlib/docstore/filechunk.h
@@ -18,7 +18,7 @@ class FastOS_FileInterface;
namespace vespalib {
class DataBuffer;
class GenericHeader;
- class Executor;
+ class ThreadExecutor;
}
namespace search {
@@ -162,7 +162,7 @@ public:
virtual bool frozen() const { return true; }
const vespalib::string & getName() const { return _name; }
void compact(const IGetLid & iGetLid);
- void appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest,
+ void appendTo(vespalib::ThreadExecutor & executor, const IGetLid & db, IWriteData & dest,
uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress);
/**
* Must be called after chunk has been created to allow correct
diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h
index 3ec19ea9a71..c202a0a2373 100644
--- a/vespalib/src/vespa/vespalib/util/threadexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h
@@ -15,6 +15,11 @@ class ThreadExecutor : public Executor,
public Syncable
{
public:
+ /**
+ * Get number of threads in the executor pool.
+ * @return number of threads in the pool
+ */
+ virtual size_t getNumThreads() const = 0;
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index 76557762479..21d1de2a29b 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -155,6 +155,10 @@ ThreadStackExecutorBase::start(uint32_t threads)
}
}
+size_t ThreadStackExecutorBase::getNumThreads() const {
+ return _pool->GetNumStartedThreads();
+}
+
void
ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit)
{
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 4ea27a2bcde..ee142659027 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -98,7 +98,7 @@ private:
void unblock();
};
- std::unique_ptr<FastOS_ThreadPool> _pool;
+ std::unique_ptr<FastOS_ThreadPool> _pool;
Monitor _monitor;
Stats _stats;
Gate _executorCompletion;
@@ -223,6 +223,8 @@ public:
**/
void wait_for_task_count(uint32_t task_count);
+ size_t getNumThreads() const override;
+
/**
* Shut down this executor. This will make this executor reject
* all new tasks.