diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-14 14:59:05 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-14 14:59:05 +0200 |
commit | 66cd305337c44b84589fb91e95d3d9662f8e6c5e (patch) | |
tree | 0ee43a40ab1d8a0b2b6e1ba837c97a6bfa011834 | |
parent | beb641c2a972af3c992ce46407297e87c3d8e193 (diff) |
Limit the number of task s in flight to 2x number of threads in the pool.
7 files changed, 25 insertions, 16 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h index 47e41fcd1db..c938288c714 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h @@ -33,6 +33,7 @@ public: return *this; } virtual bool isCurrentThread() const override; + size_t getNumThreads() const override { return _executor.getNumThreads(); } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h index a2f0724d396..905ec5ef07b 100644 --- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h @@ -3,8 +3,7 @@ #include <vespa/searchcorespi/index/i_thread_service.h> -namespace proton { -namespace test { +namespace proton::test { class ThreadServiceObserver : public searchcorespi::index::IThreadService { @@ -38,9 +37,8 @@ public: virtual bool isCurrentThread() const override { return _service.isCurrentThread(); } -}; - -} // namespace test -} // namespace proton + size_t getNumThreads() const override { return _service.getNumThreads(); } +}; +} diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp index c823bacb10c..4fac42c1421 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.cpp +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.cpp @@ -4,16 +4,16 @@ #include "data_store_file_chunk_stats.h" #include "summaryexceptions.h" #include "randreaders.h" +#include <vespa/searchlib/util/filekit.h> +#include <vespa/searchlib/common/lambdatask.h> #include <vespa/vespalib/data/fileheader.h> #include <vespa/vespalib/data/databuffer.h> #include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/array.hpp> #include <vespa/vespalib/stllike/hash_map.hpp> -#include <vespa/searchlib/util/filekit.h> -#include <vespa/searchlib/common/lambdatask.h> -#include <vespa/vespalib/objects/nbostream.h> #include <vespa/fastos/file.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <future> #include <vespa/log/log.h> @@ -333,16 +333,15 @@ appendChunks(FixedParams * args, Chunk::UP chunk) } - void -FileChunk::appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest, +FileChunk::appendTo(vespalib::ThreadExecutor & executor, const IGetLid & db, IWriteData & dest, uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress) { assert(frozen() || visitorProgress); vespalib::GenerationHandler::Guard lidReadGuard(db.getLidReadGuard()); assert(numChunks <= getNumChunks()); FixedParams fixedParams = {db, dest, lidReadGuard, getFileId().getId(), visitorProgress}; - vespalib::ThreadStackExecutor singleExecutor(1, 64*1024); + vespalib::BlockingThreadStackExecutor singleExecutor(1, 64*1024, executor.getNumThreads()*2); for (size_t chunkId(0); chunkId < numChunks; chunkId++) { std::promise<Chunk::UP> promisedChunk; std::future<Chunk::UP> futureChunk = promisedChunk.get_future(); diff --git a/searchlib/src/vespa/searchlib/docstore/filechunk.h b/searchlib/src/vespa/searchlib/docstore/filechunk.h index a7b6556a0a3..87dc2e018eb 100644 --- a/searchlib/src/vespa/searchlib/docstore/filechunk.h +++ b/searchlib/src/vespa/searchlib/docstore/filechunk.h @@ -18,7 +18,7 @@ class FastOS_FileInterface; namespace vespalib { class DataBuffer; class GenericHeader; - class Executor; + class ThreadExecutor; } namespace search { @@ -162,7 +162,7 @@ public: virtual bool frozen() const { return true; } const vespalib::string & getName() const { return _name; } void compact(const IGetLid & iGetLid); - void appendTo(vespalib::Executor & executor, const IGetLid & db, IWriteData & dest, + void appendTo(vespalib::ThreadExecutor & executor, const IGetLid & db, IWriteData & dest, uint32_t numChunks, IFileChunkVisitorProgress *visitorProgress); /** * Must be called after chunk has been created to allow correct diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h index 3ec19ea9a71..c202a0a2373 100644 --- a/vespalib/src/vespa/vespalib/util/threadexecutor.h +++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h @@ -15,6 +15,11 @@ class ThreadExecutor : public Executor, public Syncable { public: + /** + * Get number of threads in the executor pool. + * @return number of threads in the pool + */ + virtual size_t getNumThreads() const = 0; }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 76557762479..21d1de2a29b 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -155,6 +155,10 @@ ThreadStackExecutorBase::start(uint32_t threads) } } +size_t ThreadStackExecutorBase::getNumThreads() const { + return _pool->GetNumStartedThreads(); +} + void ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit) { diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 4ea27a2bcde..ee142659027 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -98,7 +98,7 @@ private: void unblock(); }; - std::unique_ptr<FastOS_ThreadPool> _pool; + std::unique_ptr<FastOS_ThreadPool> _pool; Monitor _monitor; Stats _stats; Gate _executorCompletion; @@ -223,6 +223,8 @@ public: **/ void wait_for_task_count(uint32_t task_count); + size_t getNumThreads() const override; + /** * Shut down this executor. This will make this executor reject * all new tasks. |