aboutsummaryrefslogtreecommitdiffstats
path: root/searchcorespi
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-26 09:00:38 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-11-26 10:05:48 +0000
commit1746d4a66d9696da6ae323c4532cbae4d5117498 (patch)
tree6020b5df907f4f68f35ea048581a7344eef58c9f /searchcorespi
parent1a9d505710061196383e649f0918cca7cd41a066 (diff)
Avoid requiring a syncable executor for warmup.
Diffstat (limited to 'searchcorespi')
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp12
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h4
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h6
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp33
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h17
6 files changed, 44 insertions, 30 deletions
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
index 19469d59d1b..a2bd19c3d29 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -328,10 +328,8 @@ IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex,
updateDiskIndexSchema(flushDir, *prunedSchema, noSerialNumHigh);
}
IndexWriteUtilities::writeSourceSelector(saveInfo, indexId, getAttrTune(),
- _ctx.getFileHeaderContext(),
- serialNum);
- IndexWriteUtilities::writeSerialNum(serialNum, flushDir,
- _ctx.getFileHeaderContext());
+ _ctx.getFileHeaderContext(), serialNum);
+ IndexWriteUtilities::writeSerialNum(serialNum, flushDir, _ctx.getFileHeaderContext());
return loadDiskIndex(flushDir);
}
@@ -696,7 +694,7 @@ IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index)
}
bool
-IndexMaintainer::makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive)
+IndexMaintainer::makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCollection> keepAlive)
{
// called by warmupDone via reconfigurer, warmupDone() doesn't wait for us
assert(_ctx.getThreadingService().master().isCurrentThread());
@@ -713,13 +711,13 @@ IndexMaintainer::makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP
LOG(info, "New index warmed up and switched in : %s", warmIndex->toString().c_str());
}
LOG(info, "Sync warmupExecutor.");
- _ctx.getWarmupExecutor().sync();
+ keepAlive->drainPending();
LOG(info, "Now the keep alive of the warmupindexcollection should be gone.");
return true;
}
void
-IndexMaintainer::warmupDone(ISearchableIndexCollection::SP current)
+IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current)
{
// Called by a search thread
LockGuard lock(_new_search_lock);
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
index 6e4eb32ee50..8213c02b90c 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -257,8 +257,8 @@ class IndexMaintainer : public IIndexManager,
* result.
*/
bool reconfigure(std::unique_ptr<Configure> configure);
- void warmupDone(ISearchableIndexCollection::SP current) override;
- bool makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive);
+ void warmupDone(std::shared_ptr<WarmupIndexCollection> current) override;
+ bool makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCollection> keepAlive);
void commit_and_wait();
void commit(vespalib::Gate& gate);
void pruneRemovedFields(const Schema &schema, SerialNum serialNum);
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp
index 522789e7fe8..efd7827fc3d 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp
@@ -11,7 +11,7 @@ namespace searchcorespi::index {
IndexMaintainerContext::IndexMaintainerContext(IThreadingService &threadingService,
IIndexManager::Reconfigurer &reconfigurer,
const FileHeaderContext &fileHeaderContext,
- vespalib::SyncableThreadExecutor & warmupExecutor)
+ vespalib::Executor & warmupExecutor)
: _threadingService(threadingService),
_reconfigurer(reconfigurer),
_fileHeaderContext(fileHeaderContext),
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h
index c90659c55bf..2c7aa4af48e 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h
@@ -17,13 +17,13 @@ private:
IThreadingService &_threadingService;
IIndexManager::Reconfigurer &_reconfigurer;
const search::common::FileHeaderContext &_fileHeaderContext;
- vespalib::SyncableThreadExecutor & _warmupExecutor;
+ vespalib::Executor & _warmupExecutor;
public:
IndexMaintainerContext(IThreadingService &threadingService,
IIndexManager::Reconfigurer &reconfigurer,
const search::common::FileHeaderContext &fileHeaderContext,
- vespalib::SyncableThreadExecutor & warmupExecutor);
+ vespalib::Executor & warmupExecutor);
/**
* Returns the treading service that encapsulates the thread model used for writing.
@@ -49,7 +49,7 @@ public:
/**
* @return The executor that should be used for warmup.
*/
- vespalib::SyncableThreadExecutor & getWarmupExecutor() const { return _warmupExecutor; }
+ vespalib::Executor & getWarmupExecutor() const { return _warmupExecutor; }
};
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp
index d6aba7c6ff1..0a24efd0e66 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp
+++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp
@@ -5,6 +5,7 @@
#include <vespa/searchlib/query/tree/termnodes.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/stllike/hash_set.h>
+#include <thread>
#include <vespa/log/log.h>
LOG_SETUP(".searchcorespi.index.warmupindexcollection");
@@ -29,7 +30,7 @@ WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig,
ISearchableIndexCollection::SP prev,
ISearchableIndexCollection::SP next,
IndexSearchable & warmup,
- vespalib::SyncableThreadExecutor & executor,
+ vespalib::Executor & executor,
IWarmupDone & warmupDone) :
_warmupConfig(warmupConfig),
_prev(std::move(prev)),
@@ -38,7 +39,8 @@ WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig,
_executor(executor),
_warmupDone(warmupDone),
_warmupEndTime(vespalib::steady_clock::now() + warmupConfig.getDuration()),
- _handledTerms(std::make_unique<FieldTermMap>())
+ _handledTerms(std::make_unique<FieldTermMap>()),
+ _pendingTasks(0)
{
if (_next->valid()) {
setCurrentIndex(_next->getCurrentIndex());
@@ -79,7 +81,7 @@ WarmupIndexCollection::~WarmupIndexCollection()
if (_warmupEndTime != vespalib::steady_time()) {
LOG(info, "Warmup aborted due to new state change or application shutdown");
}
- _executor.sync();
+ assert(_pendingTasks == 0);
}
const ISourceSelector &
@@ -164,7 +166,7 @@ WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext,
needWarmUp = needWarmUp || ! handledBefore(fs.getFieldId(), term);
}
if (needWarmUp) {
- auto task = std::make_unique<WarmupTask>(mdl.createMatchData(), *this);
+ auto task = std::make_unique<WarmupTask>(mdl.createMatchData(), shared_from_this());
task->createBlueprint(fsl, term);
fireWarmup(std::move(task));
}
@@ -216,25 +218,36 @@ WarmupIndexCollection::getSearchableSP(uint32_t i) const
return _next->getSearchableSP(i);
}
-WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, WarmupIndexCollection & warmup)
- : _warmup(warmup),
+void
+WarmupIndexCollection::drainPending() {
+ while (_pendingTasks > 0) {
+ std::this_thread::sleep_for(1ms);
+ }
+}
+
+WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup)
+ : _warmup(std::move(warmup)),
_matchData(std::move(md)),
_bluePrint(),
_requestContext()
-{ }
+{
+ _warmup->_pendingTasks++;
+}
-WarmupIndexCollection::WarmupTask::~WarmupTask() = default;
+WarmupIndexCollection::WarmupTask::~WarmupTask() {
+ _warmup->_pendingTasks--;
+}
void
WarmupIndexCollection::WarmupTask::run()
{
- if (_warmup._warmupEndTime != vespalib::steady_time()) {
+ if (_warmup->_warmupEndTime != vespalib::steady_time()) {
LOG(debug, "Warming up %s", _bluePrint->asString().c_str());
_bluePrint->fetchPostings(search::queryeval::ExecuteInfo::TRUE);
SearchIterator::UP it(_bluePrint->createSearch(*_matchData, true));
it->initFullRange();
for (uint32_t docId = it->seekFirst(1); !it->isAtEnd(); docId = it->seekNext(docId+1)) {
- if (_warmup.doUnpack()) {
+ if (_warmup->doUnpack()) {
it->unpack(docId);
}
}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h
index d18e43b56a7..5292fb36298 100644
--- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h
+++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h
@@ -10,11 +10,12 @@
namespace searchcorespi {
class FieldTermMap;
+class WarmupIndexCollection;
class IWarmupDone {
public:
virtual ~IWarmupDone() { }
- virtual void warmupDone(ISearchableIndexCollection::SP current) = 0;
+ virtual void warmupDone(std::shared_ptr<WarmupIndexCollection> current) = 0;
};
/**
* Index collection that holds a reference to the active one and a new one that
@@ -30,7 +31,7 @@ public:
ISearchableIndexCollection::SP prev,
ISearchableIndexCollection::SP next,
IndexSearchable & warmup,
- vespalib::SyncableThreadExecutor & executor,
+ vespalib::Executor & executor,
IWarmupDone & warmupDone);
~WarmupIndexCollection() override;
// Implements IIndexCollection
@@ -64,25 +65,26 @@ public:
const ISearchableIndexCollection::SP & getNextIndexCollection() const { return _next; }
vespalib::string toString() const override;
bool doUnpack() const { return _warmupConfig.getUnpack(); }
+ void drainPending();
private:
typedef search::fef::MatchData MatchData;
typedef search::queryeval::FakeRequestContext FakeRequestContext;
typedef vespalib::Executor::Task Task;
class WarmupTask : public Task {
public:
- WarmupTask(std::unique_ptr<MatchData> md, WarmupIndexCollection & warmup);
+ WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup);
~WarmupTask() override;
WarmupTask &createBlueprint(const FieldSpec &field, const Node &term) {
- _bluePrint = _warmup.createBlueprint(_requestContext, field, term);
+ _bluePrint = _warmup->createBlueprint(_requestContext, field, term);
return *this;
}
WarmupTask &createBlueprint(const FieldSpecList &fields, const Node &term) {
- _bluePrint = _warmup.createBlueprint(_requestContext, fields, term);
+ _bluePrint = _warmup->createBlueprint(_requestContext, fields, term);
return *this;
}
private:
void run() override;
- WarmupIndexCollection & _warmup;
+ std::shared_ptr<WarmupIndexCollection> _warmup;
std::unique_ptr<MatchData> _matchData;
Blueprint::UP _bluePrint;
FakeRequestContext _requestContext;
@@ -95,11 +97,12 @@ private:
ISearchableIndexCollection::SP _prev;
ISearchableIndexCollection::SP _next;
IndexSearchable & _warmup;
- vespalib::SyncableThreadExecutor & _executor;
+ vespalib::Executor & _executor;
IWarmupDone & _warmupDone;
vespalib::steady_time _warmupEndTime;
std::mutex _lock;
std::unique_ptr<FieldTermMap> _handledTerms;
+ std::atomic<uint64_t> _pendingTasks;
};
} // namespace searchcorespi