diff options
13 files changed, 72 insertions, 15 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index 934dcdb36e3..46a6419e924 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -33,6 +33,7 @@ public: void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) { service = std::make_unique<ExecutorThreadingService>(_transport.shared(), _transport.transport(), + _transport.clock(), field_writer_executor.get(), nullptr, ThreadingServiceConfig::make(indexing_threads, shared_field_writer)); diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp index 38314abd7e5..1b04415b78f 100644 --- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp +++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp @@ -18,6 +18,7 @@ #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/searchlib/queryeval/fake_requestcontext.h> #include <set> using document::Document; diff --git a/searchcore/src/tests/proton/index/indexcollection_test.cpp b/searchcore/src/tests/proton/index/indexcollection_test.cpp index 70141f057bf..6f6fe3d4e41 100644 --- a/searchcore/src/tests/proton/index/indexcollection_test.cpp +++ b/searchcore/src/tests/proton/index/indexcollection_test.cpp @@ -5,7 +5,7 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/threadstackexecutor.h> - +#include <vespa/vespalib/util/testclock.h> #include <vespa/log/log.h> LOG_SETUP("indexcollection_test"); @@ -43,6 +43,7 @@ public: std::shared_ptr<IndexSearchable> _source2; std::shared_ptr<IndexSearchable> _fusion_source; vespalib::ThreadStackExecutor _executor; + vespalib::TestClock _clock; std::shared_ptr<IndexSearchable> _warmup; void expect_searchable_can_be_appended(IndexCollection::UP collection) { @@ -76,7 +77,7 @@ public: } IndexCollection::UP create_warmup(const IndexCollection::SP& prev, const IndexCollection::SP& next) { - return std::make_unique<WarmupIndexCollection>(WarmupConfig(1s, false), prev, next, *_warmup, _executor, *this); + return std::make_unique<WarmupIndexCollection>(WarmupConfig(1s, false), prev, next, *_warmup, _executor, _clock.clock(), *this); } void warmupDone(std::shared_ptr<WarmupIndexCollection> current) override { diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 37a17fecfc1..e4d75085027 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -186,7 +186,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. _writeServiceConfig(configSnapshot->get_threading_service_config()), - _writeService(shared_service.shared(), shared_service.transport(), shared_service.field_writer(), + _writeService(shared_service.shared(), shared_service.transport(), shared_service.clock(), shared_service.field_writer(), &shared_service.invokeService(), _writeServiceConfig, indexing_thread_stack_size), _initializeThreads(std::move(initializeThreads)), _initConfigSnapshot(), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index d6ecc6dd2d3..dd735c75d79 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -41,12 +41,14 @@ VESPA_THREAD_STACK_TAG(field_writer_executor) } -ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor &sharedExecutor, FNET_Transport & transport, uint32_t num_treads) - : ExecutorThreadingService(sharedExecutor, transport, nullptr, nullptr, ThreadingServiceConfig::make(num_treads)) +ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor &sharedExecutor, FNET_Transport & transport, + const vespalib::Clock & clock, uint32_t num_treads) + : ExecutorThreadingService(sharedExecutor, transport, clock, nullptr, nullptr, ThreadingServiceConfig::make(num_treads)) {} ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedExecutor, FNET_Transport & transport, + const vespalib::Clock & clock, vespalib::ISequencedTaskExecutor * field_writer, vespalib::InvokeService * invokerService, const ThreadingServiceConfig & cfg, @@ -54,6 +56,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedEx : _sharedExecutor(sharedExecutor), _transport(transport), + _clock(clock), _masterExecutor(1, stackSize, CpuUsage::wrap(master_executor, CpuUsage::Category::WRITE)), _shared_field_writer(cfg.shared_field_writer()), _master_task_limit(cfg.master_task_limit()), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 77bb9042198..1179c88ef76 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -22,6 +22,7 @@ private: using Registration = std::unique_ptr<vespalib::IDestructorCallback>; vespalib::Executor & _sharedExecutor; FNET_Transport & _transport; + const vespalib::Clock & _clock; vespalib::ThreadStackExecutor _masterExecutor; ThreadingServiceConfig::SharedFieldWriterExecutor _shared_field_writer; std::atomic<uint32_t> _master_task_limit; @@ -43,10 +44,12 @@ public: /** * Convenience constructor used in unit tests. */ - ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport & transport, uint32_t num_treads = 1); + ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport & transport, + const vespalib::Clock & clock, uint32_t num_treads = 1); ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport & transport, + const vespalib::Clock & clock, vespalib::ISequencedTaskExecutor* field_writer, vespalib::InvokeService * invokeService, const ThreadingServiceConfig& cfg, @@ -82,6 +85,7 @@ public: vespalib::ISequencedTaskExecutor &indexFieldWriter() override; vespalib::ISequencedTaskExecutor &attributeFieldWriter() override; FNET_Transport &transport() override { return _transport; } + const vespalib::Clock &clock() const override { return _clock; } ExecutorThreadingServiceStats getStats(); }; diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index f728ab5f025..16b466a2275 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -50,6 +50,7 @@ public: return _shared; } FNET_Transport & transport() override { return _service.transport(); } + const vespalib::Clock & clock() const override { return _service.clock(); } vespalib::ISequencedTaskExecutor &indexFieldInverter() override { return _indexFieldInverter; } diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp index 43e267805da..0731a3429b1 100644 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp @@ -5,13 +5,15 @@ #include <vespa/fastos/thread.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/testclock.h> #include <vespa/searchcore/proton/server/executorthreadingservice.h> namespace proton { Transport::Transport() : _threadPool(std::make_unique<FastOS_ThreadPool>(64_Ki)), - _transport(std::make_unique<FNET_Transport>()) + _transport(std::make_unique<FNET_Transport>()), + _clock(std::make_unique<vespalib::TestClock>()) { _transport->Start(_threadPool.get()); } @@ -20,6 +22,10 @@ Transport::~Transport() { shutdown(); } +const vespalib::Clock & +Transport::clock() const { + return _clock->clock(); +} void Transport::shutdown() { _transport->ShutDown(true); @@ -39,7 +45,7 @@ TransportAndExecutor::shutdown() { TransportAndExecutorService::TransportAndExecutorService(size_t num_threads) : TransportAndExecutor(num_threads), - _writeService(std::make_unique<ExecutorThreadingService>(shared(), transport())) + _writeService(std::make_unique<ExecutorThreadingService>(shared(), transport(), clock())) {} TransportAndExecutorService::~TransportAndExecutorService() = default; diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h index 81610193743..8ec4f50e3f0 100644 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h @@ -5,6 +5,8 @@ class FastOS_ThreadPool; +namespace vespalib { class TestClock; } + namespace proton { class ExecutorThreadingService; @@ -18,10 +20,12 @@ public: virtual ~Transport(); FNET_Transport & transport() { return *_transport; } FastOS_ThreadPool & threadPool() { return *_threadPool; } + const vespalib::Clock & clock() const; virtual void shutdown(); private: std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<vespalib::TestClock> _clock; }; class TransportAndExecutor : public Transport { diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index 1153a09d09f..34a561dd2e2 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -431,7 +431,8 @@ IndexMaintainer::swapInNewIndex(LockGuard & guard, LOG(debug, "Warming up a disk index."); indexes = std::make_shared<WarmupIndexCollection> (_warmupConfig, getLeaf(guard, _source_list, true), indexes, - static_cast<IDiskIndex &>(source), _ctx.getWarmupExecutor(), *this); + static_cast<IDiskIndex &>(source), _ctx.getWarmupExecutor(), + _ctx.getThreadingService().clock(), *this); } else { LOG(debug, "No warmup needed as it is a memory index that is mapped in."); } diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h index 60997d6666b..c325d5ded11 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h +++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h @@ -5,7 +5,10 @@ class FNET_Transport; -namespace vespalib { class ISequencedTaskExecutor; } +namespace vespalib { + class ISequencedTaskExecutor; + class Clock; +} namespace searchcorespi::index { /** @@ -76,6 +79,7 @@ struct IThreadingService virtual vespalib::ThreadExecutor &summary() = 0; virtual vespalib::Executor &shared() = 0; virtual FNET_Transport &transport() = 0; + virtual const vespalib::Clock &clock() const = 0; virtual vespalib::ISequencedTaskExecutor &indexFieldInverter() = 0; virtual vespalib::ISequencedTaskExecutor &indexFieldWriter() = 0; virtual vespalib::ISequencedTaskExecutor &attributeFieldWriter() = 0; diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp index b9cbdab1c0a..e6e7d777656 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp @@ -1,10 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + #include "warmupindexcollection.h" #include "idiskindex.h" #include <vespa/searchlib/fef/matchdatalayout.h> #include <vespa/searchlib/query/tree/termnodes.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/stllike/hash_set.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/eval/eval/value.h> #include <thread> #include <vespa/log/log.h> @@ -31,12 +34,14 @@ WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig, ISearchableIndexCollection::SP next, IndexSearchable & warmup, vespalib::Executor & executor, + const vespalib::Clock & clock, IWarmupDone & warmupDone) : _warmupConfig(warmupConfig), _prev(std::move(prev)), _next(std::move(next)), _warmup(warmup), _executor(executor), + _clock(clock), _warmupDone(warmupDone), _warmupEndTime(vespalib::steady_clock::now() + warmupConfig.getDuration()), _handledTerms(std::make_unique<FieldTermMap>()), @@ -223,12 +228,21 @@ WarmupIndexCollection::drainPending() { _pendingTasks.waitForZeroRefCount(); } +WarmupIndexCollection::WarmupRequestContext::WarmupRequestContext(const vespalib::Clock & clock) + : _doom(clock, vespalib::steady_time::max(), vespalib::steady_time::max(), false) +{} +WarmupIndexCollection::WarmupRequestContext::~WarmupRequestContext() = default; + +std::unique_ptr<vespalib::eval::Value> +WarmupIndexCollection::WarmupRequestContext::get_query_tensor(const vespalib::string&) const { + return {}; +} WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup) : _warmup(std::move(warmup)), _retainGuard(_warmup->_pendingTasks), _matchData(std::move(md)), _bluePrint(), - _requestContext() + _requestContext(warmup->_clock) { } diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h index b0b2952bee8..c2d70b4fd5c 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h +++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h @@ -4,10 +4,11 @@ #include "isearchableindexcollection.h" #include "warmupconfig.h" -#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/searchlib/attribute/attribute_blueprint_params.h> +#include <vespa/vespalib/util/doom.h> +#include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/vespalib/util/retain_guard.h> -#include <vespa/searchlib/queryeval/fake_requestcontext.h> namespace searchcorespi { @@ -34,6 +35,7 @@ public: ISearchableIndexCollection::SP next, IndexSearchable & warmup, vespalib::Executor & executor, + const vespalib::Clock & clock, IWarmupDone & warmupDone); ~WarmupIndexCollection() override; // Implements IIndexCollection @@ -70,8 +72,22 @@ public: void drainPending(); private: typedef search::fef::MatchData MatchData; - typedef search::queryeval::FakeRequestContext FakeRequestContext; typedef vespalib::Executor::Task Task; + class WarmupRequestContext : public IRequestContext { + using IAttributeVector = search::attribute::IAttributeVector; + using AttributeBlueprintParams = search::attribute::AttributeBlueprintParams; + public: + WarmupRequestContext(const vespalib::Clock & clock); + ~WarmupRequestContext() override; + const vespalib::Doom & getDoom() const override { return _doom; } + const IAttributeVector *getAttribute(const vespalib::string &) const override { return nullptr; } + const IAttributeVector *getAttributeStableEnum(const vespalib::string &) const override { return nullptr; } + std::unique_ptr<vespalib::eval::Value> get_query_tensor(const vespalib::string&) const override; + const AttributeBlueprintParams& get_attribute_blueprint_params() const override { return _params; } + private: + const vespalib::Doom _doom; + const AttributeBlueprintParams _params; + }; class WarmupTask : public Task { public: WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup); @@ -90,7 +106,7 @@ private: vespalib::RetainGuard _retainGuard; std::unique_ptr<MatchData> _matchData; Blueprint::UP _bluePrint; - FakeRequestContext _requestContext; + WarmupRequestContext _requestContext; }; void fireWarmup(Task::UP task); @@ -101,6 +117,7 @@ private: ISearchableIndexCollection::SP _next; IndexSearchable & _warmup; vespalib::Executor & _executor; + const vespalib::Clock & _clock; IWarmupDone & _warmupDone; vespalib::steady_time _warmupEndTime; std::mutex _lock; |