diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-26 09:00:38 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-26 10:05:48 +0000 |
commit | 1746d4a66d9696da6ae323c4532cbae4d5117498 (patch) | |
tree | 6020b5df907f4f68f35ea048581a7344eef58c9f /searchcorespi | |
parent | 1a9d505710061196383e649f0918cca7cd41a066 (diff) |
Avoid requiring a syncable executor for warmup.
Diffstat (limited to 'searchcorespi')
6 files changed, 44 insertions, 30 deletions
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index 19469d59d1b..a2bd19c3d29 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -328,10 +328,8 @@ IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex, updateDiskIndexSchema(flushDir, *prunedSchema, noSerialNumHigh); } IndexWriteUtilities::writeSourceSelector(saveInfo, indexId, getAttrTune(), - _ctx.getFileHeaderContext(), - serialNum); - IndexWriteUtilities::writeSerialNum(serialNum, flushDir, - _ctx.getFileHeaderContext()); + _ctx.getFileHeaderContext(), serialNum); + IndexWriteUtilities::writeSerialNum(serialNum, flushDir, _ctx.getFileHeaderContext()); return loadDiskIndex(flushDir); } @@ -696,7 +694,7 @@ IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index) } bool -IndexMaintainer::makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive) +IndexMaintainer::makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCollection> keepAlive) { // called by warmupDone via reconfigurer, warmupDone() doesn't wait for us assert(_ctx.getThreadingService().master().isCurrentThread()); @@ -713,13 +711,13 @@ IndexMaintainer::makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP LOG(info, "New index warmed up and switched in : %s", warmIndex->toString().c_str()); } LOG(info, "Sync warmupExecutor."); - _ctx.getWarmupExecutor().sync(); + keepAlive->drainPending(); LOG(info, "Now the keep alive of the warmupindexcollection should be gone."); return true; } void -IndexMaintainer::warmupDone(ISearchableIndexCollection::SP current) +IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current) { // Called by a search thread LockGuard lock(_new_search_lock); diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index 6e4eb32ee50..8213c02b90c 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -257,8 +257,8 @@ class IndexMaintainer : public IIndexManager, * result. */ bool reconfigure(std::unique_ptr<Configure> configure); - void warmupDone(ISearchableIndexCollection::SP current) override; - bool makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive); + void warmupDone(std::shared_ptr<WarmupIndexCollection> current) override; + bool makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCollection> keepAlive); void commit_and_wait(); void commit(vespalib::Gate& gate); void pruneRemovedFields(const Schema &schema, SerialNum serialNum); diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp index 522789e7fe8..efd7827fc3d 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp @@ -11,7 +11,7 @@ namespace searchcorespi::index { IndexMaintainerContext::IndexMaintainerContext(IThreadingService &threadingService, IIndexManager::Reconfigurer &reconfigurer, const FileHeaderContext &fileHeaderContext, - vespalib::SyncableThreadExecutor & warmupExecutor) + vespalib::Executor & warmupExecutor) : _threadingService(threadingService), _reconfigurer(reconfigurer), _fileHeaderContext(fileHeaderContext), diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h index c90659c55bf..2c7aa4af48e 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h @@ -17,13 +17,13 @@ private: IThreadingService &_threadingService; IIndexManager::Reconfigurer &_reconfigurer; const search::common::FileHeaderContext &_fileHeaderContext; - vespalib::SyncableThreadExecutor & _warmupExecutor; + vespalib::Executor & _warmupExecutor; public: IndexMaintainerContext(IThreadingService &threadingService, IIndexManager::Reconfigurer &reconfigurer, const search::common::FileHeaderContext &fileHeaderContext, - vespalib::SyncableThreadExecutor & warmupExecutor); + vespalib::Executor & warmupExecutor); /** * Returns the treading service that encapsulates the thread model used for writing. @@ -49,7 +49,7 @@ public: /** * @return The executor that should be used for warmup. */ - vespalib::SyncableThreadExecutor & getWarmupExecutor() const { return _warmupExecutor; } + vespalib::Executor & getWarmupExecutor() const { return _warmupExecutor; } }; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp index d6aba7c6ff1..0a24efd0e66 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp @@ -5,6 +5,7 @@ #include <vespa/searchlib/query/tree/termnodes.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/stllike/hash_set.h> +#include <thread> #include <vespa/log/log.h> LOG_SETUP(".searchcorespi.index.warmupindexcollection"); @@ -29,7 +30,7 @@ WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig, ISearchableIndexCollection::SP prev, ISearchableIndexCollection::SP next, IndexSearchable & warmup, - vespalib::SyncableThreadExecutor & executor, + vespalib::Executor & executor, IWarmupDone & warmupDone) : _warmupConfig(warmupConfig), _prev(std::move(prev)), @@ -38,7 +39,8 @@ WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig, _executor(executor), _warmupDone(warmupDone), _warmupEndTime(vespalib::steady_clock::now() + warmupConfig.getDuration()), - _handledTerms(std::make_unique<FieldTermMap>()) + _handledTerms(std::make_unique<FieldTermMap>()), + _pendingTasks(0) { if (_next->valid()) { setCurrentIndex(_next->getCurrentIndex()); @@ -79,7 +81,7 @@ WarmupIndexCollection::~WarmupIndexCollection() if (_warmupEndTime != vespalib::steady_time()) { LOG(info, "Warmup aborted due to new state change or application shutdown"); } - _executor.sync(); + assert(_pendingTasks == 0); } const ISourceSelector & @@ -164,7 +166,7 @@ WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext, needWarmUp = needWarmUp || ! handledBefore(fs.getFieldId(), term); } if (needWarmUp) { - auto task = std::make_unique<WarmupTask>(mdl.createMatchData(), *this); + auto task = std::make_unique<WarmupTask>(mdl.createMatchData(), shared_from_this()); task->createBlueprint(fsl, term); fireWarmup(std::move(task)); } @@ -216,25 +218,36 @@ WarmupIndexCollection::getSearchableSP(uint32_t i) const return _next->getSearchableSP(i); } -WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, WarmupIndexCollection & warmup) - : _warmup(warmup), +void +WarmupIndexCollection::drainPending() { + while (_pendingTasks > 0) { + std::this_thread::sleep_for(1ms); + } +} + +WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup) + : _warmup(std::move(warmup)), _matchData(std::move(md)), _bluePrint(), _requestContext() -{ } +{ + _warmup->_pendingTasks++; +} -WarmupIndexCollection::WarmupTask::~WarmupTask() = default; +WarmupIndexCollection::WarmupTask::~WarmupTask() { + _warmup->_pendingTasks--; +} void WarmupIndexCollection::WarmupTask::run() { - if (_warmup._warmupEndTime != vespalib::steady_time()) { + if (_warmup->_warmupEndTime != vespalib::steady_time()) { LOG(debug, "Warming up %s", _bluePrint->asString().c_str()); _bluePrint->fetchPostings(search::queryeval::ExecuteInfo::TRUE); SearchIterator::UP it(_bluePrint->createSearch(*_matchData, true)); it->initFullRange(); for (uint32_t docId = it->seekFirst(1); !it->isAtEnd(); docId = it->seekNext(docId+1)) { - if (_warmup.doUnpack()) { + if (_warmup->doUnpack()) { it->unpack(docId); } } diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h index d18e43b56a7..5292fb36298 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h +++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h @@ -10,11 +10,12 @@ namespace searchcorespi { class FieldTermMap; +class WarmupIndexCollection; class IWarmupDone { public: virtual ~IWarmupDone() { } - virtual void warmupDone(ISearchableIndexCollection::SP current) = 0; + virtual void warmupDone(std::shared_ptr<WarmupIndexCollection> current) = 0; }; /** * Index collection that holds a reference to the active one and a new one that @@ -30,7 +31,7 @@ public: ISearchableIndexCollection::SP prev, ISearchableIndexCollection::SP next, IndexSearchable & warmup, - vespalib::SyncableThreadExecutor & executor, + vespalib::Executor & executor, IWarmupDone & warmupDone); ~WarmupIndexCollection() override; // Implements IIndexCollection @@ -64,25 +65,26 @@ public: const ISearchableIndexCollection::SP & getNextIndexCollection() const { return _next; } vespalib::string toString() const override; bool doUnpack() const { return _warmupConfig.getUnpack(); } + void drainPending(); private: typedef search::fef::MatchData MatchData; typedef search::queryeval::FakeRequestContext FakeRequestContext; typedef vespalib::Executor::Task Task; class WarmupTask : public Task { public: - WarmupTask(std::unique_ptr<MatchData> md, WarmupIndexCollection & warmup); + WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup); ~WarmupTask() override; WarmupTask &createBlueprint(const FieldSpec &field, const Node &term) { - _bluePrint = _warmup.createBlueprint(_requestContext, field, term); + _bluePrint = _warmup->createBlueprint(_requestContext, field, term); return *this; } WarmupTask &createBlueprint(const FieldSpecList &fields, const Node &term) { - _bluePrint = _warmup.createBlueprint(_requestContext, fields, term); + _bluePrint = _warmup->createBlueprint(_requestContext, fields, term); return *this; } private: void run() override; - WarmupIndexCollection & _warmup; + std::shared_ptr<WarmupIndexCollection> _warmup; std::unique_ptr<MatchData> _matchData; Blueprint::UP _bluePrint; FakeRequestContext _requestContext; @@ -95,11 +97,12 @@ private: ISearchableIndexCollection::SP _prev; ISearchableIndexCollection::SP _next; IndexSearchable & _warmup; - vespalib::SyncableThreadExecutor & _executor; + vespalib::Executor & _executor; IWarmupDone & _warmupDone; vespalib::steady_time _warmupEndTime; std::mutex _lock; std::unique_ptr<FieldTermMap> _handledTerms; + std::atomic<uint64_t> _pendingTasks; }; } // namespace searchcorespi |