summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2023-10-05 13:46:25 +0200
committerTor Egge <Tor.Egge@online.no>2023-10-05 13:46:25 +0200
commitdf2adb618affa36921cb1fcf2d3f1df3e4cc0f4c (patch)
tree3ab3f9a9d8dcc932ebc5405655cb77f30f79e5c7 /searchcore
parent32549787ba2720e3713b65fd44f0553aedb6f054 (diff)
Style fixes for searchcorespi::index::IndexMaintainer.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp1
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp127
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainer.h123
3 files changed, 124 insertions, 127 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
index 251d0475537..15943a02119 100644
--- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
+++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp
@@ -3,6 +3,7 @@
#include "indexmanager.h"
#include "diskindexwrapper.h"
#include "memoryindexwrapper.h"
+#include <vespa/searchcorespi/index/indexmaintainerconfig.h>
#include <vespa/searchlib/common/serialnumfileheadercontext.h>
#include <vespa/searchlib/diskindex/fusion.h>
#include <vespa/searchlib/index/schemautil.h>
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
index f6b05f639ed..9b6208db306 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -1,11 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "indexmaintainer.h"
+#include "disk_indexes.h"
#include "diskindexcleaner.h"
#include "eventlogger.h"
#include "fusionrunner.h"
+#include "indexcollection.h"
#include "indexflushtarget.h"
#include "indexfusiontarget.h"
+#include "indexmaintainerconfig.h"
#include "indexreadutilities.h"
#include "indexwriteutilities.h"
#include "index_disk_dir.h"
@@ -93,13 +96,13 @@ SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max();
class DiskIndexWithDestructorCallback : public IDiskIndex {
private:
std::shared_ptr<IDestructorCallback> _callback;
- IDiskIndex::SP _index;
+ std::shared_ptr<IDiskIndex> _index;
IndexDiskDir _index_disk_dir;
IndexDiskLayout& _layout;
DiskIndexes& _disk_indexes;
public:
- DiskIndexWithDestructorCallback(IDiskIndex::SP index,
+ DiskIndexWithDestructorCallback(std::shared_ptr<IDiskIndex> index,
std::shared_ptr<IDestructorCallback> callback,
IndexDiskLayout& layout,
DiskIndexes& disk_indexes) noexcept
@@ -116,7 +119,7 @@ public:
/**
* Implements searchcorespi::IndexSearchable
*/
- Blueprint::UP
+ std::unique_ptr<Blueprint>
createBlueprint(const IRequestContext & requestContext,
const FieldSpec &field,
const Node &term) override
@@ -125,7 +128,7 @@ public:
fsl.add(field);
return _index->createBlueprint(requestContext, fsl, term);
}
- Blueprint::UP
+ std::unique_ptr<Blueprint>
createBlueprint(const IRequestContext & requestContext,
const FieldSpecList &fields,
const Node &term) override
@@ -184,10 +187,11 @@ IndexMaintainer::FusionArgs::~FusionArgs() = default;
IndexMaintainer::SetSchemaArgs::SetSchemaArgs() = default;
IndexMaintainer::SetSchemaArgs::~SetSchemaArgs() = default;
-uint32_t
-IndexMaintainer::getNewAbsoluteId()
+void
+IndexMaintainer::set_id_for_new_memory_index()
{
- return _next_id++;
+ _current_index_id = _next_id++ - _last_fusion_id;
+ assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
}
string
@@ -221,7 +225,7 @@ IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll)
LOG(error, "Could not open schema '%s'", schemaName.c_str());
}
if (trimmedSchema != d->getSchema()) {
- IDiskIndex::SP newIndex(reloadDiskIndex(*d));
+ std::shared_ptr<IDiskIndex> newIndex(reloadDiskIndex(*d));
coll.replace(coll.getSourceId(i), newIndex);
hasReopenedAnything = true;
}
@@ -265,9 +269,9 @@ IndexMaintainer::updateActiveFusionPrunedSchema(const Schema &schema)
{
assert(_ctx.getThreadingService().master().isCurrentThread());
for (;;) {
- Schema::SP activeFusionSchema;
- Schema::SP activeFusionPrunedSchema;
- Schema::SP newActiveFusionPrunedSchema;
+ std::shared_ptr<Schema> activeFusionSchema;
+ std::shared_ptr<Schema> activeFusionPrunedSchema;
+ std::shared_ptr<Schema> newActiveFusionPrunedSchema;
{
LockGuard lock(_state_lock);
activeFusionSchema = _activeFusionSchema;
@@ -276,10 +280,10 @@ IndexMaintainer::updateActiveFusionPrunedSchema(const Schema &schema)
if (!activeFusionSchema)
return; // No active fusion
if (!activeFusionPrunedSchema) {
- Schema::UP newSchema = Schema::intersect(*activeFusionSchema, schema);
+ std::unique_ptr<Schema> newSchema = Schema::intersect(*activeFusionSchema, schema);
newActiveFusionPrunedSchema = std::move(newSchema);
} else {
- Schema::UP newSchema = Schema::intersect(*activeFusionPrunedSchema, schema);
+ std::unique_ptr<Schema> newSchema = Schema::intersect(*activeFusionPrunedSchema, schema);
newActiveFusionPrunedSchema = std::move(newSchema);
}
{
@@ -302,7 +306,7 @@ IndexMaintainer::deactivateDiskIndexes(vespalib::string indexDir)
removeOldDiskIndexes();
}
-IDiskIndex::SP
+std::shared_ptr<IDiskIndex>
IndexMaintainer::loadDiskIndex(const string &indexDir)
{
// Called by a flush worker thread OR CTOR (in document db init executor thread)
@@ -323,7 +327,7 @@ IndexMaintainer::loadDiskIndex(const string &indexDir)
return retval;
}
-IDiskIndex::SP
+std::shared_ptr<IDiskIndex>
IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex)
{
// Called by a flush worker thread OR document db executor thread
@@ -346,7 +350,7 @@ IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex)
return retval;
}
-IDiskIndex::SP
+std::shared_ptr<IDiskIndex>
IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex,
uint32_t indexId,
uint32_t docIdLimit,
@@ -356,7 +360,7 @@ IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex,
// Called by a flush worker thread
const string flushDir = getFlushDir(indexId);
memoryIndex.flushToDisk(flushDir, docIdLimit, serialNum);
- Schema::SP prunedSchema(memoryIndex.getPrunedSchema());
+ std::shared_ptr<Schema> prunedSchema(memoryIndex.getPrunedSchema());
if (prunedSchema) {
updateDiskIndexSchema(flushDir, *prunedSchema, noSerialNumHigh);
}
@@ -366,8 +370,8 @@ IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex,
return loadDiskIndex(flushDir);
}
-ISearchableIndexCollection::UP
-IndexMaintainer::loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList)
+std::unique_ptr<ISearchableIndexCollection>
+IndexMaintainer::loadDiskIndexes(const FusionSpec &spec, std::unique_ptr<ISearchableIndexCollection> sourceList)
{
// Called by CTOR (in document db init executor thread)
uint32_t fusion_id = spec.last_fusion_id;
@@ -386,8 +390,8 @@ namespace {
using LockGuard = std::lock_guard<std::mutex>;
-ISearchableIndexCollection::SP
-getLeaf(const LockGuard &newSearchLock, const ISearchableIndexCollection::SP & is, bool warn=false)
+std::shared_ptr<ISearchableIndexCollection>
+getLeaf(const LockGuard &newSearchLock, const std::shared_ptr<ISearchableIndexCollection>& is, bool warn=false)
{
if (dynamic_cast<const WarmupIndexCollection *>(is.get()) != nullptr) {
if (warn) {
@@ -408,11 +412,11 @@ getLeaf(const LockGuard &newSearchLock, const ISearchableIndexCollection::SP & i
* Caller must hold _state_lock (SL).
*/
void
-IndexMaintainer::replaceSource(uint32_t sourceId, const IndexSearchable::SP &source)
+IndexMaintainer::replaceSource(uint32_t sourceId, const std::shared_ptr<IndexSearchable>& source)
{
assert(_ctx.getThreadingService().master().isCurrentThread());
LockGuard lock(_new_search_lock);
- ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock);
+ std::unique_ptr<ISearchableIndexCollection> indexes = createNewSourceCollection(lock);
indexes->replace(sourceId, source);
swapInNewIndex(lock, std::move(indexes), *source);
}
@@ -423,7 +427,7 @@ IndexMaintainer::replaceSource(uint32_t sourceId, const IndexSearchable::SP &sou
*/
void
IndexMaintainer::swapInNewIndex(LockGuard & guard,
- ISearchableIndexCollection::SP indexes,
+ std::shared_ptr<ISearchableIndexCollection> indexes,
IndexSearchable & source)
{
assert(indexes->valid());
@@ -448,19 +452,19 @@ IndexMaintainer::swapInNewIndex(LockGuard & guard,
* Caller must hold _state_lock (SL).
*/
void
-IndexMaintainer::appendSource(uint32_t sourceId, const IndexSearchable::SP &source)
+IndexMaintainer::appendSource(uint32_t sourceId, const std::shared_ptr<IndexSearchable>& source)
{
assert(_ctx.getThreadingService().master().isCurrentThread());
LockGuard lock(_new_search_lock);
- ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock);
+ std::unique_ptr<ISearchableIndexCollection> indexes = createNewSourceCollection(lock);
indexes->append(sourceId, source);
swapInNewIndex(lock, std::move(indexes), *source);
}
-ISearchableIndexCollection::UP
+std::unique_ptr<ISearchableIndexCollection>
IndexMaintainer::createNewSourceCollection(const LockGuard &newSearchLock)
{
- ISearchableIndexCollection::SP currentLeaf(getLeaf(newSearchLock, _source_list));
+ std::shared_ptr<ISearchableIndexCollection> currentLeaf(getLeaf(newSearchLock, _source_list));
return std::make_unique<IndexCollection>(_selector, *currentLeaf);
}
@@ -482,13 +486,13 @@ IndexMaintainer::FlushArgs::FlushArgs(FlushArgs &&) = default;
IndexMaintainer::FlushArgs & IndexMaintainer::FlushArgs::operator=(FlushArgs &&) = default;
bool
-IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index)
+IndexMaintainer::doneInitFlush(FlushArgs *args, std::shared_ptr<IMemoryIndex>* new_index)
{
// Called by initFlush via reconfigurer
assert(_ctx.getThreadingService().master().isCurrentThread());
LockGuard state_lock(_state_lock);
args->old_index = _current_index;
- args->old_absolute_id = _current_index_id + _last_fusion_id;
+ args->old_absolute_id = get_absolute_id();
args->old_source_list = _source_list;
string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id));
args->flush_serial_num = current_serial_num();
@@ -513,9 +517,7 @@ IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index)
if (!args->_skippedEmptyLast) {
// Keep on using same source selector with extended valid range
args->save_info = getSourceSelector().extractSaveInfo(selector_name);
- // XXX: Overflow issue in source selector
- _current_index_id = getNewAbsoluteId() - _last_fusion_id;
- assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ set_id_for_new_memory_index();
_selector->setDefaultSource(_current_index_id);
_source_selector_changes = 0;
}
@@ -604,10 +606,10 @@ IndexMaintainer::flushMemoryIndex(FlushArgs &args,
// Called by a flush worker thread
ChangeGens changeGens = getChangeGens();
IMemoryIndex &memoryIndex = *args.old_index;
- Schema::SP prunedSchema = memoryIndex.getPrunedSchema();
- IDiskIndex::SP diskIndex = flushMemoryIndex(memoryIndex, args.old_absolute_id,
- docIdLimit, args.flush_serial_num,
- saveInfo);
+ std::shared_ptr<Schema> prunedSchema = memoryIndex.getPrunedSchema();
+ std::shared_ptr<IDiskIndex> diskIndex = flushMemoryIndex(memoryIndex, args.old_absolute_id,
+ docIdLimit, args.flush_serial_num,
+ saveInfo);
// Post processing after memory index has been written to disk and
// opened as disk index.
args._changeGens = changeGens;
@@ -619,7 +621,7 @@ IndexMaintainer::flushMemoryIndex(FlushArgs &args,
void
-IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex)
+IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, std::shared_ptr<IDiskIndex>& diskIndex)
{
// Called by a flush worker thread
for (;;) {
@@ -631,12 +633,12 @@ IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskInde
return;
}
ChangeGens changeGens = getChangeGens();
- Schema::SP prunedSchema = args.old_index->getPrunedSchema();
+ std::shared_ptr<Schema> prunedSchema = args.old_index->getPrunedSchema();
const string indexDir = getFlushDir(args.old_absolute_id);
if (prunedSchema) {
updateDiskIndexSchema(indexDir, *prunedSchema, noSerialNumHigh);
}
- IDiskIndex::SP reloadedDiskIndex = reloadDiskIndex(*diskIndex);
+ std::shared_ptr<IDiskIndex> reloadedDiskIndex = reloadDiskIndex(*diskIndex);
diskIndex = reloadedDiskIndex;
args._changeGens = changeGens;
args._prunedSchema = prunedSchema;
@@ -645,7 +647,7 @@ IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskInde
bool
-IndexMaintainer::doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index) {
+IndexMaintainer::doneFlush(FlushArgs *args, std::shared_ptr<IDiskIndex> *disk_index) {
// Called by doFlush via reconfigurer
assert(_ctx.getThreadingService().master().isCurrentThread());
LockGuard state_lock(_state_lock);
@@ -683,7 +685,7 @@ IndexMaintainer::canRunFusion(const FusionSpec &spec) const
}
bool
-IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index)
+IndexMaintainer::doneFusion(FusionArgs *args, std::shared_ptr<IDiskIndex>* new_index)
{
// Called by runFusion via reconfigurer
assert(_ctx.getThreadingService().master().isCurrentThread());
@@ -711,12 +713,12 @@ IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index)
_activeFusionPrunedSchema.reset();
}
- ISearchableIndexCollection::SP currentLeaf;
+ std::shared_ptr<ISearchableIndexCollection> currentLeaf;
{
LockGuard lock(_new_search_lock);
currentLeaf = getLeaf(lock, _source_list);
}
- ISearchableIndexCollection::UP fsc =
+ std::unique_ptr<ISearchableIndexCollection> fsc =
IndexCollection::replaceAndRenumber(_selector, *currentLeaf, id_diff, *new_index);
fsc->setCurrentIndex(_current_index_id);
@@ -732,7 +734,7 @@ IndexMaintainer::makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCol
{
// called by warmupDone via reconfigurer, warmupDone() doesn't wait for us
assert(_ctx.getThreadingService().master().isCurrentThread());
- ISearchableIndexCollection::SP warmIndex;
+ std::shared_ptr<ISearchableIndexCollection> warmIndex;
{
LockGuard state_lock(_state_lock);
if (keepAlive == _source_list) {
@@ -786,7 +788,7 @@ has_matching_interleaved_features(const Schema& old_schema, const Schema& new_sc
void
-IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex)
+IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex)
{
assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor
LockGuard state_lock(_state_lock);
@@ -794,11 +796,11 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex)
args._oldSchema = _schema; // Delay destruction
args._oldIndex = _current_index; // Delay destruction
args._oldSourceList = _source_list; // Delay destruction
- uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id;
+ uint32_t oldAbsoluteId = get_absolute_id();
string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId));
SerialNum freezeSerialNum = current_serial_num();
bool dropEmptyLast = false;
- SaveInfo::UP saveInfo;
+ std::unique_ptr<SaveInfo> saveInfo;
LOG(info, "Making new schema. Id = %u. Serial num = %llu", oldAbsoluteId, (unsigned long long) freezeSerialNum);
{
@@ -811,9 +813,7 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex)
if (!dropEmptyLast) {
// Keep on using same source selector with extended valid range
saveInfo = getSourceSelector().extractSaveInfo(selectorName);
- // XXX: Overflow issue in source selector
- _current_index_id = getNewAbsoluteId() - _last_fusion_id;
- assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ set_id_for_new_memory_index();
_selector->setDefaultSource(_current_index_id);
// Extra index to flush next time flushing is performed
_frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId);
@@ -842,7 +842,7 @@ IndexMaintainer::getSchema(void) const
return _schema;
}
-Schema::SP
+std::shared_ptr<Schema>
IndexMaintainer::getActiveFusionPrunedSchema(void) const
{
LockGuard lock(_index_update_lock);
@@ -938,8 +938,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
_selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff);
assert(_last_fusion_id == _selector->getBaseId());
}
- _current_index_id = getNewAbsoluteId() - _last_fusion_id;
- assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ set_id_for_new_memory_index();
_selector->setDefaultSource(_current_index_id);
auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector));
_current_index = operations.createMemoryIndex(_schema, *sourceList, current_serial_num());
@@ -961,7 +960,7 @@ IndexMaintainer::~IndexMaintainer()
_selector.reset();
}
-FlushTask::UP
+std::unique_ptr<FlushTask>
IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stats)
{
assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits
@@ -970,7 +969,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat
set_current_serial_num(std::max(current_serial_num(), serialNum));
}
- IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num()));
+ std::shared_ptr<IMemoryIndex> new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num()));
FlushArgs args;
args.stats = stats;
// Ensure that all index thread tasks accessing memory index have completed.
@@ -990,7 +989,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat
LOG(debug, "No memory index to flush. Update serial number and flush time to current: "
"flushSerialNum(%" PRIu64 "), lastFlushTime(%f)",
flush_serial_num(), vespalib::to_s(_lastFlushTime.time_since_epoch()));
- return FlushTask::UP();
+ return {};
}
SerialNum realSerialNum = args.flush_serial_num;
return makeLambdaFlushTask([this, myargs=std::move(args)]() mutable { doFlush(std::move(myargs)); }, realSerialNum);
@@ -1115,12 +1114,12 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search
}
const string new_fusion_dir = getFusionDir(new_fusion_id);
- Schema::SP prunedSchema = getActiveFusionPrunedSchema();
+ std::shared_ptr<Schema> prunedSchema = getActiveFusionPrunedSchema();
if (prunedSchema) {
updateDiskIndexSchema(new_fusion_dir, *prunedSchema, noSerialNumHigh);
}
ChangeGens changeGens = getChangeGens();
- IDiskIndex::SP new_index(loadDiskIndex(new_fusion_dir));
+ std::shared_ptr<IDiskIndex> new_index(loadDiskIndex(new_fusion_dir));
remove_fusion_index_guard.reset();
// Post processing after fusion operation has completed and new disk
@@ -1142,7 +1141,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search
if (prunedSchema) {
updateDiskIndexSchema(new_fusion_dir, *prunedSchema, noSerialNumHigh);
}
- IDiskIndex::SP diskIndex2;
+ std::shared_ptr<IDiskIndex> diskIndex2;
diskIndex2 = reloadDiskIndex(*new_index);
new_index = diskIndex2;
args._changeGens = changeGens;
@@ -1196,7 +1195,7 @@ IndexMaintainer::getFusionStats() const
{
// Called by flush engine scheduler thread (from getFlushTargets())
FusionStats stats;
- IndexSearchable::SP source_list;
+ std::shared_ptr<IndexSearchable> source_list;
{
LockGuard lock(_new_search_lock);
@@ -1315,7 +1314,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum)
{
assert(_ctx.getThreadingService().master().isCurrentThread());
pruneRemovedFields(schema, serialNum);
- IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num()));
+ std::shared_ptr<IMemoryIndex> new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num()));
SetSchemaArgs args;
args._newSchema = schema;
@@ -1331,8 +1330,8 @@ void
IndexMaintainer::pruneRemovedFields(const Schema &schema, SerialNum serialNum)
{
assert(_ctx.getThreadingService().master().isCurrentThread());
- ISearchableIndexCollection::SP new_source_list;
- IIndexCollection::SP coll = getSourceCollection();
+ std::shared_ptr<ISearchableIndexCollection> new_source_list;
+ std::shared_ptr<IIndexCollection> coll = getSourceCollection();
updateIndexSchemas(*coll, schema, serialNum);
updateActiveFusionPrunedSchema(schema);
{
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h
index 5cf8c2f67d5..963e669bc4c 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -2,18 +2,11 @@
#pragma once
#include "iindexmanager.h"
-#include "disk_indexes.h"
#include "fusionspec.h"
-#include "idiskindex.h"
#include "iindexmaintaineroperations.h"
#include "indexdisklayout.h"
-#include "indexmaintainerconfig.h"
#include "indexmaintainercontext.h"
-#include "imemoryindex.h"
#include "warmupindexcollection.h"
-#include "ithreadingservice.h"
-#include "indexsearchable.h"
-#include "indexcollection.h"
#include <vespa/searchcorespi/flush/iflushtarget.h>
#include <vespa/searchcorespi/flush/flushstats.h>
#include <vespa/searchlib/attribute/fixedsourceselector.h>
@@ -27,6 +20,9 @@ namespace vespalib { class Gate; }
namespace searchcorespi::index {
+class DiskIndexes;
+class IndexMaintainerConfig;
+
/**
* The IndexMaintainer provides a holistic view of a set of disk and
* memory indexes. It allows updating the active memory index, enables search
@@ -42,16 +38,15 @@ class IndexMaintainer : public IIndexManager,
public:
using SaveInfo = search::FixedSourceSelector::SaveInfo;
using SerialNum = search::SerialNum;
- using SaveInfoSP = std::shared_ptr<SaveInfo>;
- IMemoryIndex::SP _index;
- SerialNum _serialNum;
- SaveInfoSP _saveInfo;
- uint32_t _absoluteId;
+ std::shared_ptr<IMemoryIndex> _index;
+ SerialNum _serialNum;
+ std::shared_ptr<SaveInfo> _saveInfo;
+ uint32_t _absoluteId;
- FrozenMemoryIndexRef(const IMemoryIndex::SP &index,
+ FrozenMemoryIndexRef(const std::shared_ptr<IMemoryIndex> &index,
SerialNum serialNum,
- SaveInfo::UP saveInfo,
+ std::unique_ptr<SaveInfo> saveInfo,
uint32_t absoluteId)
: _index(index),
_serialNum(serialNum),
@@ -78,20 +73,20 @@ class IndexMaintainer : public IIndexManager,
const vespalib::string _base_dir;
const WarmupConfig _warmupConfig;
- DiskIndexes::SP _disk_indexes;
+ std::shared_ptr<DiskIndexes> _disk_indexes;
IndexDiskLayout _layout;
- Schema _schema; // Protected by SL + IUL
- Schema::SP _activeFusionSchema; // Protected by SL + IUL
+ Schema _schema; // Protected by SL + IUL
+ std::shared_ptr<Schema> _activeFusionSchema; // Protected by SL + IUL
// Protected by SL + IUL
- Schema::SP _activeFusionPrunedSchema;
+ std::shared_ptr<Schema> _activeFusionPrunedSchema;
uint32_t _source_selector_changes; // Protected by IUL
// _selector is protected by SL + IUL
- ISourceSelector::SP _selector;
- ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread
+ std::shared_ptr<ISourceSelector> _selector;
+ std::shared_ptr<ISearchableIndexCollection> _source_list; // Protected by SL + NSL, only set by master thread
uint32_t _last_fusion_id; // Protected by SL + IUL
uint32_t _next_id; // Protected by SL + IUL
uint32_t _current_index_id; // Protected by SL + IUL
- IMemoryIndex::SP _current_index; // Protected by SL + IUL
+ std::shared_ptr<IMemoryIndex> _current_index; // Protected by SL + IUL
bool _flush_empty_current_index;
std::atomic<SerialNum> _current_serial_num;// Writes protected by IUL
std::atomic<SerialNum> _flush_serial_num; // Writes protected by SL
@@ -130,24 +125,26 @@ class IndexMaintainer : public IIndexManager,
* and pruning of removed fields, since this will trigger more retries for
* some of the operations.
*/
- std::mutex _state_lock; // Outer lock (SL)
- mutable std::mutex _index_update_lock; // Inner lock (IUL)
- mutable std::mutex _new_search_lock; // Inner lock (NSL)
- std::mutex _remove_lock; // Lock for removing indexes.
- // Protected by SL + IUL
- FusionSpec _fusion_spec; // Protected by FL
- mutable std::mutex _fusion_lock; // Fusion spec lock (FL)
- uint32_t _maxFlushed;
- uint32_t _maxFrozen;
- ChangeGens _changeGens; // Protected by SL + IUL
- std::mutex _schemaUpdateLock; // Serialize rewrite of schema
+ std::mutex _state_lock; // Outer lock (SL)
+ mutable std::mutex _index_update_lock; // Inner lock (IUL)
+ mutable std::mutex _new_search_lock; // Inner lock (NSL)
+ std::mutex _remove_lock; // Lock for removing indexes.
+ FusionSpec _fusion_spec; // Protected by FL
+ mutable std::mutex _fusion_lock; // Fusion spec lock (FL)
+ uint32_t _maxFlushed; // Protected by NSL
+ const uint32_t _maxFrozen;
+ ChangeGens _changeGens; // Protected by SL + IUL
+ std::mutex _schemaUpdateLock; // Serialize rewrite of schema
const search::TuneFileAttributes _tuneFileAttributes;
const IndexMaintainerContext _ctx;
- IIndexMaintainerOperations &_operations;
+ IIndexMaintainerOperations& _operations;
search::FixedSourceSelector & getSourceSelector() { return static_cast<search::FixedSourceSelector &>(*_selector); }
const search::FixedSourceSelector & getSourceSelector() const { return static_cast<const search::FixedSourceSelector &>(*_selector); }
- uint32_t getNewAbsoluteId();
+ // get absolute id of current memory index, caller holds SL or IUL
+ uint32_t get_absolute_id() const noexcept { return _last_fusion_id + _current_index_id; }
+ // set id for new memory index, other callers than constructor holds SL and IUL
+ void set_id_for_new_memory_index();
vespalib::string getFlushDir(uint32_t sourceId) const;
vespalib::string getFusionDir(uint32_t sourceId) const;
@@ -168,26 +165,26 @@ class IndexMaintainer : public IIndexManager,
void updateActiveFusionPrunedSchema(const Schema &schema);
void deactivateDiskIndexes(vespalib::string indexDir);
- IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir);
- IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex);
+ std::shared_ptr<IDiskIndex> loadDiskIndex(const vespalib::string &indexDir);
+ std::shared_ptr<IDiskIndex> reloadDiskIndex(const IDiskIndex &oldIndex);
- IDiskIndex::SP flushMemoryIndex(IMemoryIndex &memoryIndex,
- uint32_t indexId,
- uint32_t docIdLimit,
- SerialNum serialNum,
- search::FixedSourceSelector::SaveInfo &saveInfo);
+ std::shared_ptr<IDiskIndex> flushMemoryIndex(IMemoryIndex &memoryIndex,
+ uint32_t indexId,
+ uint32_t docIdLimit,
+ SerialNum serialNum,
+ search::FixedSourceSelector::SaveInfo &saveInfo);
- ISearchableIndexCollection::UP loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList);
- void replaceSource(uint32_t sourceId, const IndexSearchable::SP &source);
- void appendSource(uint32_t sourceId, const IndexSearchable::SP &source);
- void swapInNewIndex(LockGuard & guard, ISearchableIndexCollection::SP indexes, IndexSearchable & source);
- ISearchableIndexCollection::UP createNewSourceCollection(const LockGuard &newSearchLock);
+ std::unique_ptr<ISearchableIndexCollection> loadDiskIndexes(const FusionSpec &spec, std::unique_ptr<ISearchableIndexCollection> sourceList);
+ void replaceSource(uint32_t sourceId, const std::shared_ptr<IndexSearchable>& source);
+ void appendSource(uint32_t sourceId, const std::shared_ptr<IndexSearchable>& source);
+ void swapInNewIndex(LockGuard & guard, std::shared_ptr<ISearchableIndexCollection> indexes, IndexSearchable & source);
+ std::unique_ptr<ISearchableIndexCollection> createNewSourceCollection(const LockGuard &newSearchLock);
struct FlushArgs {
- IMemoryIndex::SP old_index; // Last memory index
+ std::shared_ptr<IMemoryIndex> old_index; // Last memory index
uint32_t old_absolute_id;
- ISearchableIndexCollection::SP old_source_list; // Delays destruction
- search::FixedSourceSelector::SaveInfo::SP save_info;
+ std::shared_ptr<ISearchableIndexCollection> old_source_list; // Delays destruction
+ std::shared_ptr<search::FixedSourceSelector::SaveInfo> save_info;
SerialNum flush_serial_num;
searchcorespi::FlushStats * stats;
bool _skippedEmptyLast; // Don't flush empty memory index
@@ -198,7 +195,7 @@ class IndexMaintainer : public IIndexManager,
// or data structure limitations).
FrozenMemoryIndexRefs _extraIndexes;
ChangeGens _changeGens;
- Schema::SP _prunedSchema;
+ std::shared_ptr<Schema> _prunedSchema;
FlushArgs();
FlushArgs(const FlushArgs &) = delete;
@@ -208,15 +205,15 @@ class IndexMaintainer : public IIndexManager,
~FlushArgs();
};
- bool doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index);
+ bool doneInitFlush(FlushArgs *args, std::shared_ptr<IMemoryIndex> *new_index);
void doFlush(FlushArgs args);
void flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds);
void flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds);
void updateFlushStats(const FlushArgs &args);
void flushMemoryIndex(FlushArgs &args, uint32_t docIdLimit,
search::FixedSourceSelector::SaveInfo &saveInfo, FlushIds &flushIds);
- void reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex);
- bool doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index);
+ void reconfigureAfterFlush(FlushArgs &args, std::shared_ptr<IDiskIndex>& diskIndex);
+ bool doneFlush(FlushArgs *args, std::shared_ptr<IDiskIndex> *disk_index);
class FusionArgs {
@@ -224,8 +221,8 @@ class IndexMaintainer : public IIndexManager,
uint32_t _new_fusion_id;
ChangeGens _changeGens;
Schema _schema;
- Schema::SP _prunedSchema;
- ISearchableIndexCollection::SP _old_source_list; // Delays destruction
+ std::shared_ptr<Schema> _prunedSchema;
+ std::shared_ptr<ISearchableIndexCollection> _old_source_list; // Delays destruction
FusionArgs();
~FusionArgs();
@@ -233,23 +230,23 @@ class IndexMaintainer : public IIndexManager,
void scheduleFusion(const FlushIds &flushIds);
bool canRunFusion(const FusionSpec &spec) const;
- bool doneFusion(FusionArgs *args, IDiskIndex::SP *new_index);
+ bool doneFusion(FusionArgs *args, std::shared_ptr<IDiskIndex> *new_index);
class SetSchemaArgs {
public:
Schema _newSchema;
Schema _oldSchema;
- IMemoryIndex::SP _oldIndex;
- ISearchableIndexCollection::SP _oldSourceList; // Delays destruction
+ std::shared_ptr<IMemoryIndex> _oldIndex;
+ std::shared_ptr<ISearchableIndexCollection> _oldSourceList; // Delays destruction
SetSchemaArgs();
~SetSchemaArgs();
};
- void doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex);
+ void doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex);
Schema getSchema(void) const;
- Schema::SP getActiveFusionPrunedSchema() const;
+ std::shared_ptr<Schema> getActiveFusionPrunedSchema() const;
search::TuneFileAttributes getAttrTune();
ChangeGens getChangeGens();
@@ -289,7 +286,7 @@ public:
* Starts a new MemoryIndex, and dumps the previous one to disk.
* Updates flush stats when finished if specified.
**/
- FlushTask::UP initFlush(SerialNum serialNum, FlushStats * stats);
+ std::unique_ptr<FlushTask> initFlush(SerialNum serialNum, FlushStats * stats);
FusionSpec getFusionSpec();
/**
@@ -353,12 +350,12 @@ public:
return flush_serial_num();
}
- IIndexCollection::SP getSourceCollection() const {
+ std::shared_ptr<IIndexCollection> getSourceCollection() const {
LockGuard lock(_new_search_lock);
return _source_list;
}
- searchcorespi::IndexSearchable::SP getSearchable() const override {
+ std::shared_ptr<searchcorespi::IndexSearchable> getSearchable() const override {
LockGuard lock(_new_search_lock);
return _source_list;
}