From df2adb618affa36921cb1fcf2d3f1df3e4cc0f4c Mon Sep 17 00:00:00 2001 From: Tor Egge Date: Thu, 5 Oct 2023 13:46:25 +0200 Subject: Style fixes for searchcorespi::index::IndexMaintainer. --- .../vespa/searchcore/proton/index/indexmanager.cpp | 1 + .../vespa/searchcorespi/index/indexmaintainer.cpp | 127 ++++++++++----------- .../vespa/searchcorespi/index/indexmaintainer.h | 123 ++++++++++---------- 3 files changed, 124 insertions(+), 127 deletions(-) (limited to 'searchcore/src') diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp index 251d0475537..15943a02119 100644 --- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.cpp @@ -3,6 +3,7 @@ #include "indexmanager.h" #include "diskindexwrapper.h" #include "memoryindexwrapper.h" +#include #include #include #include diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp index f6b05f639ed..9b6208db306 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -1,11 +1,14 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "indexmaintainer.h" +#include "disk_indexes.h" #include "diskindexcleaner.h" #include "eventlogger.h" #include "fusionrunner.h" +#include "indexcollection.h" #include "indexflushtarget.h" #include "indexfusiontarget.h" +#include "indexmaintainerconfig.h" #include "indexreadutilities.h" #include "indexwriteutilities.h" #include "index_disk_dir.h" @@ -93,13 +96,13 @@ SerialNum noSerialNumHigh = std::numeric_limits::max(); class DiskIndexWithDestructorCallback : public IDiskIndex { private: std::shared_ptr _callback; - IDiskIndex::SP _index; + std::shared_ptr _index; IndexDiskDir _index_disk_dir; IndexDiskLayout& _layout; DiskIndexes& _disk_indexes; public: - DiskIndexWithDestructorCallback(IDiskIndex::SP index, + DiskIndexWithDestructorCallback(std::shared_ptr index, std::shared_ptr callback, IndexDiskLayout& layout, DiskIndexes& disk_indexes) noexcept @@ -116,7 +119,7 @@ public: /** * Implements searchcorespi::IndexSearchable */ - Blueprint::UP + std::unique_ptr createBlueprint(const IRequestContext & requestContext, const FieldSpec &field, const Node &term) override @@ -125,7 +128,7 @@ public: fsl.add(field); return _index->createBlueprint(requestContext, fsl, term); } - Blueprint::UP + std::unique_ptr createBlueprint(const IRequestContext & requestContext, const FieldSpecList &fields, const Node &term) override @@ -184,10 +187,11 @@ IndexMaintainer::FusionArgs::~FusionArgs() = default; IndexMaintainer::SetSchemaArgs::SetSchemaArgs() = default; IndexMaintainer::SetSchemaArgs::~SetSchemaArgs() = default; -uint32_t -IndexMaintainer::getNewAbsoluteId() +void +IndexMaintainer::set_id_for_new_memory_index() { - return _next_id++; + _current_index_id = _next_id++ - _last_fusion_id; + assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); } string @@ -221,7 +225,7 @@ IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll) LOG(error, "Could not open schema '%s'", schemaName.c_str()); } if (trimmedSchema != d->getSchema()) { - IDiskIndex::SP newIndex(reloadDiskIndex(*d)); + std::shared_ptr newIndex(reloadDiskIndex(*d)); coll.replace(coll.getSourceId(i), newIndex); hasReopenedAnything = true; } @@ -265,9 +269,9 @@ IndexMaintainer::updateActiveFusionPrunedSchema(const Schema &schema) { assert(_ctx.getThreadingService().master().isCurrentThread()); for (;;) { - Schema::SP activeFusionSchema; - Schema::SP activeFusionPrunedSchema; - Schema::SP newActiveFusionPrunedSchema; + std::shared_ptr activeFusionSchema; + std::shared_ptr activeFusionPrunedSchema; + std::shared_ptr newActiveFusionPrunedSchema; { LockGuard lock(_state_lock); activeFusionSchema = _activeFusionSchema; @@ -276,10 +280,10 @@ IndexMaintainer::updateActiveFusionPrunedSchema(const Schema &schema) if (!activeFusionSchema) return; // No active fusion if (!activeFusionPrunedSchema) { - Schema::UP newSchema = Schema::intersect(*activeFusionSchema, schema); + std::unique_ptr newSchema = Schema::intersect(*activeFusionSchema, schema); newActiveFusionPrunedSchema = std::move(newSchema); } else { - Schema::UP newSchema = Schema::intersect(*activeFusionPrunedSchema, schema); + std::unique_ptr newSchema = Schema::intersect(*activeFusionPrunedSchema, schema); newActiveFusionPrunedSchema = std::move(newSchema); } { @@ -302,7 +306,7 @@ IndexMaintainer::deactivateDiskIndexes(vespalib::string indexDir) removeOldDiskIndexes(); } -IDiskIndex::SP +std::shared_ptr IndexMaintainer::loadDiskIndex(const string &indexDir) { // Called by a flush worker thread OR CTOR (in document db init executor thread) @@ -323,7 +327,7 @@ IndexMaintainer::loadDiskIndex(const string &indexDir) return retval; } -IDiskIndex::SP +std::shared_ptr IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex) { // Called by a flush worker thread OR document db executor thread @@ -346,7 +350,7 @@ IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex) return retval; } -IDiskIndex::SP +std::shared_ptr IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex, uint32_t indexId, uint32_t docIdLimit, @@ -356,7 +360,7 @@ IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex, // Called by a flush worker thread const string flushDir = getFlushDir(indexId); memoryIndex.flushToDisk(flushDir, docIdLimit, serialNum); - Schema::SP prunedSchema(memoryIndex.getPrunedSchema()); + std::shared_ptr prunedSchema(memoryIndex.getPrunedSchema()); if (prunedSchema) { updateDiskIndexSchema(flushDir, *prunedSchema, noSerialNumHigh); } @@ -366,8 +370,8 @@ IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex, return loadDiskIndex(flushDir); } -ISearchableIndexCollection::UP -IndexMaintainer::loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList) +std::unique_ptr +IndexMaintainer::loadDiskIndexes(const FusionSpec &spec, std::unique_ptr sourceList) { // Called by CTOR (in document db init executor thread) uint32_t fusion_id = spec.last_fusion_id; @@ -386,8 +390,8 @@ namespace { using LockGuard = std::lock_guard; -ISearchableIndexCollection::SP -getLeaf(const LockGuard &newSearchLock, const ISearchableIndexCollection::SP & is, bool warn=false) +std::shared_ptr +getLeaf(const LockGuard &newSearchLock, const std::shared_ptr& is, bool warn=false) { if (dynamic_cast(is.get()) != nullptr) { if (warn) { @@ -408,11 +412,11 @@ getLeaf(const LockGuard &newSearchLock, const ISearchableIndexCollection::SP & i * Caller must hold _state_lock (SL). */ void -IndexMaintainer::replaceSource(uint32_t sourceId, const IndexSearchable::SP &source) +IndexMaintainer::replaceSource(uint32_t sourceId, const std::shared_ptr& source) { assert(_ctx.getThreadingService().master().isCurrentThread()); LockGuard lock(_new_search_lock); - ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock); + std::unique_ptr indexes = createNewSourceCollection(lock); indexes->replace(sourceId, source); swapInNewIndex(lock, std::move(indexes), *source); } @@ -423,7 +427,7 @@ IndexMaintainer::replaceSource(uint32_t sourceId, const IndexSearchable::SP &sou */ void IndexMaintainer::swapInNewIndex(LockGuard & guard, - ISearchableIndexCollection::SP indexes, + std::shared_ptr indexes, IndexSearchable & source) { assert(indexes->valid()); @@ -448,19 +452,19 @@ IndexMaintainer::swapInNewIndex(LockGuard & guard, * Caller must hold _state_lock (SL). */ void -IndexMaintainer::appendSource(uint32_t sourceId, const IndexSearchable::SP &source) +IndexMaintainer::appendSource(uint32_t sourceId, const std::shared_ptr& source) { assert(_ctx.getThreadingService().master().isCurrentThread()); LockGuard lock(_new_search_lock); - ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock); + std::unique_ptr indexes = createNewSourceCollection(lock); indexes->append(sourceId, source); swapInNewIndex(lock, std::move(indexes), *source); } -ISearchableIndexCollection::UP +std::unique_ptr IndexMaintainer::createNewSourceCollection(const LockGuard &newSearchLock) { - ISearchableIndexCollection::SP currentLeaf(getLeaf(newSearchLock, _source_list)); + std::shared_ptr currentLeaf(getLeaf(newSearchLock, _source_list)); return std::make_unique(_selector, *currentLeaf); } @@ -482,13 +486,13 @@ IndexMaintainer::FlushArgs::FlushArgs(FlushArgs &&) = default; IndexMaintainer::FlushArgs & IndexMaintainer::FlushArgs::operator=(FlushArgs &&) = default; bool -IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index) +IndexMaintainer::doneInitFlush(FlushArgs *args, std::shared_ptr* new_index) { // Called by initFlush via reconfigurer assert(_ctx.getThreadingService().master().isCurrentThread()); LockGuard state_lock(_state_lock); args->old_index = _current_index; - args->old_absolute_id = _current_index_id + _last_fusion_id; + args->old_absolute_id = get_absolute_id(); args->old_source_list = _source_list; string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id)); args->flush_serial_num = current_serial_num(); @@ -513,9 +517,7 @@ IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index) if (!args->_skippedEmptyLast) { // Keep on using same source selector with extended valid range args->save_info = getSourceSelector().extractSaveInfo(selector_name); - // XXX: Overflow issue in source selector - _current_index_id = getNewAbsoluteId() - _last_fusion_id; - assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + set_id_for_new_memory_index(); _selector->setDefaultSource(_current_index_id); _source_selector_changes = 0; } @@ -604,10 +606,10 @@ IndexMaintainer::flushMemoryIndex(FlushArgs &args, // Called by a flush worker thread ChangeGens changeGens = getChangeGens(); IMemoryIndex &memoryIndex = *args.old_index; - Schema::SP prunedSchema = memoryIndex.getPrunedSchema(); - IDiskIndex::SP diskIndex = flushMemoryIndex(memoryIndex, args.old_absolute_id, - docIdLimit, args.flush_serial_num, - saveInfo); + std::shared_ptr prunedSchema = memoryIndex.getPrunedSchema(); + std::shared_ptr diskIndex = flushMemoryIndex(memoryIndex, args.old_absolute_id, + docIdLimit, args.flush_serial_num, + saveInfo); // Post processing after memory index has been written to disk and // opened as disk index. args._changeGens = changeGens; @@ -619,7 +621,7 @@ IndexMaintainer::flushMemoryIndex(FlushArgs &args, void -IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex) +IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, std::shared_ptr& diskIndex) { // Called by a flush worker thread for (;;) { @@ -631,12 +633,12 @@ IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskInde return; } ChangeGens changeGens = getChangeGens(); - Schema::SP prunedSchema = args.old_index->getPrunedSchema(); + std::shared_ptr prunedSchema = args.old_index->getPrunedSchema(); const string indexDir = getFlushDir(args.old_absolute_id); if (prunedSchema) { updateDiskIndexSchema(indexDir, *prunedSchema, noSerialNumHigh); } - IDiskIndex::SP reloadedDiskIndex = reloadDiskIndex(*diskIndex); + std::shared_ptr reloadedDiskIndex = reloadDiskIndex(*diskIndex); diskIndex = reloadedDiskIndex; args._changeGens = changeGens; args._prunedSchema = prunedSchema; @@ -645,7 +647,7 @@ IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskInde bool -IndexMaintainer::doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index) { +IndexMaintainer::doneFlush(FlushArgs *args, std::shared_ptr *disk_index) { // Called by doFlush via reconfigurer assert(_ctx.getThreadingService().master().isCurrentThread()); LockGuard state_lock(_state_lock); @@ -683,7 +685,7 @@ IndexMaintainer::canRunFusion(const FusionSpec &spec) const } bool -IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index) +IndexMaintainer::doneFusion(FusionArgs *args, std::shared_ptr* new_index) { // Called by runFusion via reconfigurer assert(_ctx.getThreadingService().master().isCurrentThread()); @@ -711,12 +713,12 @@ IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index) _activeFusionPrunedSchema.reset(); } - ISearchableIndexCollection::SP currentLeaf; + std::shared_ptr currentLeaf; { LockGuard lock(_new_search_lock); currentLeaf = getLeaf(lock, _source_list); } - ISearchableIndexCollection::UP fsc = + std::unique_ptr fsc = IndexCollection::replaceAndRenumber(_selector, *currentLeaf, id_diff, *new_index); fsc->setCurrentIndex(_current_index_id); @@ -732,7 +734,7 @@ IndexMaintainer::makeSureAllRemainingWarmupIsDone(std::shared_ptr warmIndex; { LockGuard state_lock(_state_lock); if (keepAlive == _source_list) { @@ -786,7 +788,7 @@ has_matching_interleaved_features(const Schema& old_schema, const Schema& new_sc void -IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex) +IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr& newIndex) { assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor LockGuard state_lock(_state_lock); @@ -794,11 +796,11 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex) args._oldSchema = _schema; // Delay destruction args._oldIndex = _current_index; // Delay destruction args._oldSourceList = _source_list; // Delay destruction - uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id; + uint32_t oldAbsoluteId = get_absolute_id(); string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId)); SerialNum freezeSerialNum = current_serial_num(); bool dropEmptyLast = false; - SaveInfo::UP saveInfo; + std::unique_ptr saveInfo; LOG(info, "Making new schema. Id = %u. Serial num = %llu", oldAbsoluteId, (unsigned long long) freezeSerialNum); { @@ -811,9 +813,7 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex) if (!dropEmptyLast) { // Keep on using same source selector with extended valid range saveInfo = getSourceSelector().extractSaveInfo(selectorName); - // XXX: Overflow issue in source selector - _current_index_id = getNewAbsoluteId() - _last_fusion_id; - assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + set_id_for_new_memory_index(); _selector->setDefaultSource(_current_index_id); // Extra index to flush next time flushing is performed _frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId); @@ -842,7 +842,7 @@ IndexMaintainer::getSchema(void) const return _schema; } -Schema::SP +std::shared_ptr IndexMaintainer::getActiveFusionPrunedSchema(void) const { LockGuard lock(_index_update_lock); @@ -938,8 +938,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff); assert(_last_fusion_id == _selector->getBaseId()); } - _current_index_id = getNewAbsoluteId() - _last_fusion_id; - assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + set_id_for_new_memory_index(); _selector->setDefaultSource(_current_index_id); auto sourceList = loadDiskIndexes(spec, std::make_unique(_selector)); _current_index = operations.createMemoryIndex(_schema, *sourceList, current_serial_num()); @@ -961,7 +960,7 @@ IndexMaintainer::~IndexMaintainer() _selector.reset(); } -FlushTask::UP +std::unique_ptr IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stats) { assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits @@ -970,7 +969,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat set_current_serial_num(std::max(current_serial_num(), serialNum)); } - IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num())); + std::shared_ptr new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num())); FlushArgs args; args.stats = stats; // Ensure that all index thread tasks accessing memory index have completed. @@ -990,7 +989,7 @@ IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stat LOG(debug, "No memory index to flush. Update serial number and flush time to current: " "flushSerialNum(%" PRIu64 "), lastFlushTime(%f)", flush_serial_num(), vespalib::to_s(_lastFlushTime.time_since_epoch())); - return FlushTask::UP(); + return {}; } SerialNum realSerialNum = args.flush_serial_num; return makeLambdaFlushTask([this, myargs=std::move(args)]() mutable { doFlush(std::move(myargs)); }, realSerialNum); @@ -1115,12 +1114,12 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr prunedSchema = getActiveFusionPrunedSchema(); if (prunedSchema) { updateDiskIndexSchema(new_fusion_dir, *prunedSchema, noSerialNumHigh); } ChangeGens changeGens = getChangeGens(); - IDiskIndex::SP new_index(loadDiskIndex(new_fusion_dir)); + std::shared_ptr new_index(loadDiskIndex(new_fusion_dir)); remove_fusion_index_guard.reset(); // Post processing after fusion operation has completed and new disk @@ -1142,7 +1141,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr diskIndex2; diskIndex2 = reloadDiskIndex(*new_index); new_index = diskIndex2; args._changeGens = changeGens; @@ -1196,7 +1195,7 @@ IndexMaintainer::getFusionStats() const { // Called by flush engine scheduler thread (from getFlushTargets()) FusionStats stats; - IndexSearchable::SP source_list; + std::shared_ptr source_list; { LockGuard lock(_new_search_lock); @@ -1315,7 +1314,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) { assert(_ctx.getThreadingService().master().isCurrentThread()); pruneRemovedFields(schema, serialNum); - IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num())); + std::shared_ptr new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num())); SetSchemaArgs args; args._newSchema = schema; @@ -1331,8 +1330,8 @@ void IndexMaintainer::pruneRemovedFields(const Schema &schema, SerialNum serialNum) { assert(_ctx.getThreadingService().master().isCurrentThread()); - ISearchableIndexCollection::SP new_source_list; - IIndexCollection::SP coll = getSourceCollection(); + std::shared_ptr new_source_list; + std::shared_ptr coll = getSourceCollection(); updateIndexSchemas(*coll, schema, serialNum); updateActiveFusionPrunedSchema(schema); { diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h index 5cf8c2f67d5..963e669bc4c 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h @@ -2,18 +2,11 @@ #pragma once #include "iindexmanager.h" -#include "disk_indexes.h" #include "fusionspec.h" -#include "idiskindex.h" #include "iindexmaintaineroperations.h" #include "indexdisklayout.h" -#include "indexmaintainerconfig.h" #include "indexmaintainercontext.h" -#include "imemoryindex.h" #include "warmupindexcollection.h" -#include "ithreadingservice.h" -#include "indexsearchable.h" -#include "indexcollection.h" #include #include #include @@ -27,6 +20,9 @@ namespace vespalib { class Gate; } namespace searchcorespi::index { +class DiskIndexes; +class IndexMaintainerConfig; + /** * The IndexMaintainer provides a holistic view of a set of disk and * memory indexes. It allows updating the active memory index, enables search @@ -42,16 +38,15 @@ class IndexMaintainer : public IIndexManager, public: using SaveInfo = search::FixedSourceSelector::SaveInfo; using SerialNum = search::SerialNum; - using SaveInfoSP = std::shared_ptr; - IMemoryIndex::SP _index; - SerialNum _serialNum; - SaveInfoSP _saveInfo; - uint32_t _absoluteId; + std::shared_ptr _index; + SerialNum _serialNum; + std::shared_ptr _saveInfo; + uint32_t _absoluteId; - FrozenMemoryIndexRef(const IMemoryIndex::SP &index, + FrozenMemoryIndexRef(const std::shared_ptr &index, SerialNum serialNum, - SaveInfo::UP saveInfo, + std::unique_ptr saveInfo, uint32_t absoluteId) : _index(index), _serialNum(serialNum), @@ -78,20 +73,20 @@ class IndexMaintainer : public IIndexManager, const vespalib::string _base_dir; const WarmupConfig _warmupConfig; - DiskIndexes::SP _disk_indexes; + std::shared_ptr _disk_indexes; IndexDiskLayout _layout; - Schema _schema; // Protected by SL + IUL - Schema::SP _activeFusionSchema; // Protected by SL + IUL + Schema _schema; // Protected by SL + IUL + std::shared_ptr _activeFusionSchema; // Protected by SL + IUL // Protected by SL + IUL - Schema::SP _activeFusionPrunedSchema; + std::shared_ptr _activeFusionPrunedSchema; uint32_t _source_selector_changes; // Protected by IUL // _selector is protected by SL + IUL - ISourceSelector::SP _selector; - ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread + std::shared_ptr _selector; + std::shared_ptr _source_list; // Protected by SL + NSL, only set by master thread uint32_t _last_fusion_id; // Protected by SL + IUL uint32_t _next_id; // Protected by SL + IUL uint32_t _current_index_id; // Protected by SL + IUL - IMemoryIndex::SP _current_index; // Protected by SL + IUL + std::shared_ptr _current_index; // Protected by SL + IUL bool _flush_empty_current_index; std::atomic _current_serial_num;// Writes protected by IUL std::atomic _flush_serial_num; // Writes protected by SL @@ -130,24 +125,26 @@ class IndexMaintainer : public IIndexManager, * and pruning of removed fields, since this will trigger more retries for * some of the operations. */ - std::mutex _state_lock; // Outer lock (SL) - mutable std::mutex _index_update_lock; // Inner lock (IUL) - mutable std::mutex _new_search_lock; // Inner lock (NSL) - std::mutex _remove_lock; // Lock for removing indexes. - // Protected by SL + IUL - FusionSpec _fusion_spec; // Protected by FL - mutable std::mutex _fusion_lock; // Fusion spec lock (FL) - uint32_t _maxFlushed; - uint32_t _maxFrozen; - ChangeGens _changeGens; // Protected by SL + IUL - std::mutex _schemaUpdateLock; // Serialize rewrite of schema + std::mutex _state_lock; // Outer lock (SL) + mutable std::mutex _index_update_lock; // Inner lock (IUL) + mutable std::mutex _new_search_lock; // Inner lock (NSL) + std::mutex _remove_lock; // Lock for removing indexes. + FusionSpec _fusion_spec; // Protected by FL + mutable std::mutex _fusion_lock; // Fusion spec lock (FL) + uint32_t _maxFlushed; // Protected by NSL + const uint32_t _maxFrozen; + ChangeGens _changeGens; // Protected by SL + IUL + std::mutex _schemaUpdateLock; // Serialize rewrite of schema const search::TuneFileAttributes _tuneFileAttributes; const IndexMaintainerContext _ctx; - IIndexMaintainerOperations &_operations; + IIndexMaintainerOperations& _operations; search::FixedSourceSelector & getSourceSelector() { return static_cast(*_selector); } const search::FixedSourceSelector & getSourceSelector() const { return static_cast(*_selector); } - uint32_t getNewAbsoluteId(); + // get absolute id of current memory index, caller holds SL or IUL + uint32_t get_absolute_id() const noexcept { return _last_fusion_id + _current_index_id; } + // set id for new memory index, other callers than constructor holds SL and IUL + void set_id_for_new_memory_index(); vespalib::string getFlushDir(uint32_t sourceId) const; vespalib::string getFusionDir(uint32_t sourceId) const; @@ -168,26 +165,26 @@ class IndexMaintainer : public IIndexManager, void updateActiveFusionPrunedSchema(const Schema &schema); void deactivateDiskIndexes(vespalib::string indexDir); - IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir); - IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex); + std::shared_ptr loadDiskIndex(const vespalib::string &indexDir); + std::shared_ptr reloadDiskIndex(const IDiskIndex &oldIndex); - IDiskIndex::SP flushMemoryIndex(IMemoryIndex &memoryIndex, - uint32_t indexId, - uint32_t docIdLimit, - SerialNum serialNum, - search::FixedSourceSelector::SaveInfo &saveInfo); + std::shared_ptr flushMemoryIndex(IMemoryIndex &memoryIndex, + uint32_t indexId, + uint32_t docIdLimit, + SerialNum serialNum, + search::FixedSourceSelector::SaveInfo &saveInfo); - ISearchableIndexCollection::UP loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList); - void replaceSource(uint32_t sourceId, const IndexSearchable::SP &source); - void appendSource(uint32_t sourceId, const IndexSearchable::SP &source); - void swapInNewIndex(LockGuard & guard, ISearchableIndexCollection::SP indexes, IndexSearchable & source); - ISearchableIndexCollection::UP createNewSourceCollection(const LockGuard &newSearchLock); + std::unique_ptr loadDiskIndexes(const FusionSpec &spec, std::unique_ptr sourceList); + void replaceSource(uint32_t sourceId, const std::shared_ptr& source); + void appendSource(uint32_t sourceId, const std::shared_ptr& source); + void swapInNewIndex(LockGuard & guard, std::shared_ptr indexes, IndexSearchable & source); + std::unique_ptr createNewSourceCollection(const LockGuard &newSearchLock); struct FlushArgs { - IMemoryIndex::SP old_index; // Last memory index + std::shared_ptr old_index; // Last memory index uint32_t old_absolute_id; - ISearchableIndexCollection::SP old_source_list; // Delays destruction - search::FixedSourceSelector::SaveInfo::SP save_info; + std::shared_ptr old_source_list; // Delays destruction + std::shared_ptr save_info; SerialNum flush_serial_num; searchcorespi::FlushStats * stats; bool _skippedEmptyLast; // Don't flush empty memory index @@ -198,7 +195,7 @@ class IndexMaintainer : public IIndexManager, // or data structure limitations). FrozenMemoryIndexRefs _extraIndexes; ChangeGens _changeGens; - Schema::SP _prunedSchema; + std::shared_ptr _prunedSchema; FlushArgs(); FlushArgs(const FlushArgs &) = delete; @@ -208,15 +205,15 @@ class IndexMaintainer : public IIndexManager, ~FlushArgs(); }; - bool doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index); + bool doneInitFlush(FlushArgs *args, std::shared_ptr *new_index); void doFlush(FlushArgs args); void flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds); void flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds); void updateFlushStats(const FlushArgs &args); void flushMemoryIndex(FlushArgs &args, uint32_t docIdLimit, search::FixedSourceSelector::SaveInfo &saveInfo, FlushIds &flushIds); - void reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex); - bool doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index); + void reconfigureAfterFlush(FlushArgs &args, std::shared_ptr& diskIndex); + bool doneFlush(FlushArgs *args, std::shared_ptr *disk_index); class FusionArgs { @@ -224,8 +221,8 @@ class IndexMaintainer : public IIndexManager, uint32_t _new_fusion_id; ChangeGens _changeGens; Schema _schema; - Schema::SP _prunedSchema; - ISearchableIndexCollection::SP _old_source_list; // Delays destruction + std::shared_ptr _prunedSchema; + std::shared_ptr _old_source_list; // Delays destruction FusionArgs(); ~FusionArgs(); @@ -233,23 +230,23 @@ class IndexMaintainer : public IIndexManager, void scheduleFusion(const FlushIds &flushIds); bool canRunFusion(const FusionSpec &spec) const; - bool doneFusion(FusionArgs *args, IDiskIndex::SP *new_index); + bool doneFusion(FusionArgs *args, std::shared_ptr *new_index); class SetSchemaArgs { public: Schema _newSchema; Schema _oldSchema; - IMemoryIndex::SP _oldIndex; - ISearchableIndexCollection::SP _oldSourceList; // Delays destruction + std::shared_ptr _oldIndex; + std::shared_ptr _oldSourceList; // Delays destruction SetSchemaArgs(); ~SetSchemaArgs(); }; - void doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex); + void doneSetSchema(SetSchemaArgs &args, std::shared_ptr& newIndex); Schema getSchema(void) const; - Schema::SP getActiveFusionPrunedSchema() const; + std::shared_ptr getActiveFusionPrunedSchema() const; search::TuneFileAttributes getAttrTune(); ChangeGens getChangeGens(); @@ -289,7 +286,7 @@ public: * Starts a new MemoryIndex, and dumps the previous one to disk. * Updates flush stats when finished if specified. **/ - FlushTask::UP initFlush(SerialNum serialNum, FlushStats * stats); + std::unique_ptr initFlush(SerialNum serialNum, FlushStats * stats); FusionSpec getFusionSpec(); /** @@ -353,12 +350,12 @@ public: return flush_serial_num(); } - IIndexCollection::SP getSourceCollection() const { + std::shared_ptr getSourceCollection() const { LockGuard lock(_new_search_lock); return _source_list; } - searchcorespi::IndexSearchable::SP getSearchable() const override { + std::shared_ptr getSearchable() const override { LockGuard lock(_new_search_lock); return _source_list; } -- cgit v1.2.3