summaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2023-10-09 13:46:12 +0200
committerTor Egge <Tor.Egge@online.no>2023-10-09 13:46:12 +0200
commit4e3dbe2b7974452ed643cd4fd4bf1daf7e4b5970 (patch)
tree5d968cd9f7c10379b9092af2f87a554ae9db656c /searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
parentb09acf5a94ff3fe7b70381478fedcc242d965c32 (diff)
Flush memory indexes to disk then fusion disk indexes as soon as
possible when enabling interleaved features.
Diffstat (limited to 'searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp')
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp111
1 files changed, 85 insertions, 26 deletions
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
index a7828c00bc4..96a29994fbc 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -768,26 +768,8 @@ IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current)
}
}
-namespace {
-
-bool
-has_matching_interleaved_features(const Schema& old_schema, const Schema& new_schema)
-{
- for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) {
- if (itr.hasMatchingOldFields(old_schema) &&
- !itr.has_matching_use_interleaved_features(old_schema))
- {
- return false;
- }
- }
- return true;
-}
-
-}
-
-
void
-IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex)
+IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex, SerialNum serial_num)
{
assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor
LockGuard state_lock(_state_lock);
@@ -818,12 +800,12 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex
_frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId);
}
_current_index = newIndex;
- // Non-matching interleaved features in schemas means that we need to
- // reconstruct or drop interleaved features in posting lists.
- // If so, we must flush the new index to disk even if it is empty.
- // This ensures that 2x triggerFlush will run fusion
- // to reconstruct or drop interleaved features in the posting lists.
- _flush_empty_current_index = !has_matching_interleaved_features(args._oldSchema, args._newSchema);
+ if (serial_num > flush_serial_num()) {
+ consider_urgent_flush(args._oldSchema, args._newSchema, get_absolute_id());
+ }
+ // If schema changes triggered a need for urgent flush then we must
+ // be able to flush the new index to disk even if it is empty.
+ _flush_empty_current_index = (_urgent_flush_id == get_absolute_id());
}
if (dropEmptyLast) {
replaceSource(_current_index_id, _current_index);
@@ -887,6 +869,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
_last_fusion_id(),
_next_id(),
_current_index_id(),
+ _urgent_flush_id(),
_current_index(),
_flush_empty_current_index(false),
_current_serial_num(0),
@@ -950,6 +933,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
pruneRemovedFields(_schema, config.getSerialNum());
}));
_ctx.getThreadingService().master().sync();
+ consider_initial_urgent_flush();
}
IndexMaintainer::~IndexMaintainer()
@@ -1320,7 +1304,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum)
// Ensure that all index thread tasks accessing memory index have completed.
commit_and_wait();
// Everything should be quiet now.
- doneSetSchema(args, new_index);
+ doneSetSchema(args, new_index, serialNum);
// Source collection has now changed, caller must reconfigure further
// as appropriate.
}
@@ -1358,4 +1342,79 @@ IndexMaintainer::setMaxFlushed(uint32_t maxFlushed)
_maxFlushed = maxFlushed;
}
+void
+IndexMaintainer::consider_urgent_flush(const Schema& old_schema, const Schema& new_schema, uint32_t flush_id)
+{
+ // Non-matching interleaved features in schemas means that we need to
+ // reconstruct or drop interleaved features in posting lists. Schedule
+ // urgent flush until all indexes are in sync.
+ for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) {
+ if (itr.hasMatchingOldFields(old_schema) &&
+ !itr.has_matching_use_interleaved_features(old_schema))
+ {
+ _urgent_flush_id = flush_id;
+ break;
+ }
+ }
+}
+
+void
+IndexMaintainer::consider_initial_urgent_flush()
+{
+ const Schema *prev_schema = nullptr;
+ std::optional<uint32_t> urgent_source_id;
+ auto coll = getSourceCollection();
+ uint32_t count = coll->getSourceCount();
+ for (uint32_t i = 0; i < count; ++i) {
+ IndexSearchable &is = coll->getSearchable(i);
+ const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is);
+ if (d != nullptr) {
+ auto schema = &d->getSchema();
+ if (prev_schema != nullptr) {
+ consider_urgent_flush(*prev_schema, *schema, _last_fusion_id + coll->getSourceId(i));
+ }
+ prev_schema = schema;
+ }
+ }
+}
+
+uint32_t
+IndexMaintainer::get_urgent_flush_id() const
+{
+ LockGuard lock(_index_update_lock);
+ return _urgent_flush_id;
+}
+
+bool
+IndexMaintainer::urgent_memory_index_flush() const
+{
+ LockGuard lock(_index_update_lock);
+ for (auto& frozen : _frozenMemoryIndexes) {
+ if (frozen._absoluteId == _urgent_flush_id) {
+ return true;
+ }
+ }
+ if (get_absolute_id() == _urgent_flush_id) {
+ return true;
+ }
+ return false;
+}
+
+bool
+IndexMaintainer::urgent_disk_index_fusion() const
+{
+ uint32_t urgent_flush_id = get_urgent_flush_id();
+ LockGuard lock(_fusion_lock);
+ auto& flush_ids = _fusion_spec.flush_ids;
+ return std::find(flush_ids.begin(), flush_ids.end(), urgent_flush_id) != std::end(flush_ids);
+}
+
+bool
+IndexMaintainer::has_pending_urgent_flush() const
+{
+ uint32_t urgent_flush_id = get_urgent_flush_id();
+ LockGuard lock(_fusion_lock);
+ return urgent_flush_id > _fusion_spec.last_fusion_id;
+}
+
}