diff options
author | Tor Egge <Tor.Egge@online.no> | 2023-10-09 13:46:12 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2023-10-09 13:46:12 +0200 |
commit | 4e3dbe2b7974452ed643cd4fd4bf1daf7e4b5970 (patch) | |
tree | 5d968cd9f7c10379b9092af2f87a554ae9db656c /searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp | |
parent | b09acf5a94ff3fe7b70381478fedcc242d965c32 (diff) |
Flush memory indexes to disk then fusion disk indexes as soon as
possible when enabling interleaved features.
Diffstat (limited to 'searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp')
-rw-r--r-- | searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp | 111 |
1 files changed, 85 insertions, 26 deletions
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp index a7828c00bc4..96a29994fbc 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -768,26 +768,8 @@ IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current) } } -namespace { - -bool -has_matching_interleaved_features(const Schema& old_schema, const Schema& new_schema) -{ - for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) { - if (itr.hasMatchingOldFields(old_schema) && - !itr.has_matching_use_interleaved_features(old_schema)) - { - return false; - } - } - return true; -} - -} - - void -IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex) +IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex, SerialNum serial_num) { assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor LockGuard state_lock(_state_lock); @@ -818,12 +800,12 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex _frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId); } _current_index = newIndex; - // Non-matching interleaved features in schemas means that we need to - // reconstruct or drop interleaved features in posting lists. - // If so, we must flush the new index to disk even if it is empty. - // This ensures that 2x triggerFlush will run fusion - // to reconstruct or drop interleaved features in the posting lists. - _flush_empty_current_index = !has_matching_interleaved_features(args._oldSchema, args._newSchema); + if (serial_num > flush_serial_num()) { + consider_urgent_flush(args._oldSchema, args._newSchema, get_absolute_id()); + } + // If schema changes triggered a need for urgent flush then we must + // be able to flush the new index to disk even if it is empty. + _flush_empty_current_index = (_urgent_flush_id == get_absolute_id()); } if (dropEmptyLast) { replaceSource(_current_index_id, _current_index); @@ -887,6 +869,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, _last_fusion_id(), _next_id(), _current_index_id(), + _urgent_flush_id(), _current_index(), _flush_empty_current_index(false), _current_serial_num(0), @@ -950,6 +933,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, pruneRemovedFields(_schema, config.getSerialNum()); })); _ctx.getThreadingService().master().sync(); + consider_initial_urgent_flush(); } IndexMaintainer::~IndexMaintainer() @@ -1320,7 +1304,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) // Ensure that all index thread tasks accessing memory index have completed. commit_and_wait(); // Everything should be quiet now. - doneSetSchema(args, new_index); + doneSetSchema(args, new_index, serialNum); // Source collection has now changed, caller must reconfigure further // as appropriate. } @@ -1358,4 +1342,79 @@ IndexMaintainer::setMaxFlushed(uint32_t maxFlushed) _maxFlushed = maxFlushed; } +void +IndexMaintainer::consider_urgent_flush(const Schema& old_schema, const Schema& new_schema, uint32_t flush_id) +{ + // Non-matching interleaved features in schemas means that we need to + // reconstruct or drop interleaved features in posting lists. Schedule + // urgent flush until all indexes are in sync. + for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) { + if (itr.hasMatchingOldFields(old_schema) && + !itr.has_matching_use_interleaved_features(old_schema)) + { + _urgent_flush_id = flush_id; + break; + } + } +} + +void +IndexMaintainer::consider_initial_urgent_flush() +{ + const Schema *prev_schema = nullptr; + std::optional<uint32_t> urgent_source_id; + auto coll = getSourceCollection(); + uint32_t count = coll->getSourceCount(); + for (uint32_t i = 0; i < count; ++i) { + IndexSearchable &is = coll->getSearchable(i); + const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is); + if (d != nullptr) { + auto schema = &d->getSchema(); + if (prev_schema != nullptr) { + consider_urgent_flush(*prev_schema, *schema, _last_fusion_id + coll->getSourceId(i)); + } + prev_schema = schema; + } + } +} + +uint32_t +IndexMaintainer::get_urgent_flush_id() const +{ + LockGuard lock(_index_update_lock); + return _urgent_flush_id; +} + +bool +IndexMaintainer::urgent_memory_index_flush() const +{ + LockGuard lock(_index_update_lock); + for (auto& frozen : _frozenMemoryIndexes) { + if (frozen._absoluteId == _urgent_flush_id) { + return true; + } + } + if (get_absolute_id() == _urgent_flush_id) { + return true; + } + return false; +} + +bool +IndexMaintainer::urgent_disk_index_fusion() const +{ + uint32_t urgent_flush_id = get_urgent_flush_id(); + LockGuard lock(_fusion_lock); + auto& flush_ids = _fusion_spec.flush_ids; + return std::find(flush_ids.begin(), flush_ids.end(), urgent_flush_id) != std::end(flush_ids); +} + +bool +IndexMaintainer::has_pending_urgent_flush() const +{ + uint32_t urgent_flush_id = get_urgent_flush_id(); + LockGuard lock(_fusion_lock); + return urgent_flush_id > _fusion_spec.last_fusion_id; +} + } |