diff options
author | Tor Egge <Tor.Egge@yahooinc.com> | 2023-10-10 14:23:22 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-10 14:23:22 +0200 |
commit | 42707ddb5a30dc4e530e2f5d2d56539462a62ad9 (patch) | |
tree | 3a2b418f5748dad750d9bb249be3003da8f6a3ff | |
parent | f36d2cf35428720fc40f330ed2ce27b325e175db (diff) | |
parent | 26a777173863791c024b56aab805bce5737dcf00 (diff) |
Merge pull request #28840 from vespa-engine/toregge/flush-indexes-when-enabling-interleaved-features
Flush memory indexes to disk then fusion disk indexes as soon as
9 files changed, 324 insertions, 46 deletions
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index f7bccd43576..ea447a85e06 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -31,7 +31,9 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/fastos/file.h> #include <filesystem> +#include <ostream> #include <set> +#include <sstream> #include <thread> #include <vespa/log/log.h> @@ -87,9 +89,9 @@ const uint32_t docid = 1; auto add_fields = [](auto& header) { header.addField(field_name, document::DataType::T_STRING); }; -Schema getSchema() { +Schema getSchema(std::optional<bool> interleaved_features) { DocBuilder db(add_fields); - return SchemaBuilder(db).add_all_indexes().build(); + return SchemaBuilder(db).add_all_indexes(interleaved_features).build(); } void removeTestData() { @@ -117,7 +119,7 @@ struct IndexManagerTest : public ::testing::Test { DummyFileHeaderContext _fileHeaderContext; TransportAndExecutorService _service; std::unique_ptr<IndexManager> _index_manager; - Schema _schema; + std::optional<bool> _interleaved_features; DocBuilder _builder; IndexManagerTest() @@ -126,7 +128,7 @@ struct IndexManagerTest : public ::testing::Test { _fileHeaderContext(), _service(1), _index_manager(), - _schema(getSchema()), + _interleaved_features(), _builder(add_fields) { removeTestData(); @@ -139,7 +141,7 @@ struct IndexManagerTest : public ::testing::Test { } template <class FunctionType> - inline void runAsMaster(FunctionType &&function) { + void runAsMaster(FunctionType &&function) { vespalib::Gate gate; _service.write().master().execute(makeLambdaTask([&gate,function = std::move(function)]() { function(); @@ -148,7 +150,7 @@ struct IndexManagerTest : public ::testing::Test { gate.await(); } template <class FunctionType> - inline void runAsIndex(FunctionType &&function) { + void runAsIndex(FunctionType &&function) { vespalib::Gate gate; _service.write().index().execute(makeLambdaTask([&gate,function = std::move(function)]() { function(); @@ -157,8 +159,9 @@ struct IndexManagerTest : public ::testing::Test { gate.await(); } void flushIndexManager(); + void run_fusion(); Document::UP addDocument(uint32_t docid); - void resetIndexManager(); + void resetIndexManager(SerialNum serial_num = 1); void removeDocument(uint32_t docId, SerialNum serialNum) { vespalib::Gate gate; runAsIndex([&]() { @@ -177,12 +180,23 @@ struct IndexManagerTest : public ::testing::Test { } void assertStats(uint32_t expNumDiskIndexes, uint32_t expNumMemoryIndexes, - SerialNum expLastiskIndexSerialNum, + SerialNum expLastDiskIndexSerialNum, SerialNum expLastMemoryIndexSerialNum); IIndexCollection::SP get_source_collection() const { return _index_manager->getMaintainer().getSourceCollection(); } + void set_schema(const Schema& schema, SerialNum serial_num) + { + runAsMaster([&]() { _index_manager->setSchema(schema, serial_num); }); + } + + bool has_pending_urgent_flush() const { + return _index_manager->has_pending_urgent_flush(); + } + bool has_urgent_memory_index_flush() const; + bool has_urgent_fusion() const; + void assert_urgent(const vespalib::string& label, bool pending, bool flush, bool fusion); }; void @@ -197,6 +211,31 @@ IndexManagerTest::flushIndexManager() } } +void +IndexManagerTest::run_fusion() +{ + IndexFusionTarget target(_index_manager->getMaintainer()); + std::unique_ptr<vespalib::Executor::Task> task; + runAsMaster([&]() { task = target.initFlush(0, std::make_shared<search::FlushToken>()); }); + if (task) { + task->run(); + } +} + +bool +IndexManagerTest::has_urgent_memory_index_flush() const +{ + IndexFlushTarget target(_index_manager->getMaintainer()); + return target.needUrgentFlush(); +} + +bool +IndexManagerTest::has_urgent_fusion() const +{ + IndexFusionTarget target(_index_manager->getMaintainer()); + return target.needUrgentFlush(); +} + Document::UP IndexManagerTest::addDocument(uint32_t id) { @@ -211,12 +250,13 @@ IndexManagerTest::addDocument(uint32_t id) } void -IndexManagerTest::resetIndexManager() +IndexManagerTest::resetIndexManager(SerialNum serial_num) { _index_manager.reset(); - _index_manager = std::make_unique<IndexManager>(index_dir, IndexConfig(), getSchema(), 1, + _index_manager = std::make_unique<IndexManager>(index_dir, IndexConfig(), getSchema(_interleaved_features), serial_num, _reconfigurer, _service.write(), _service.shared(), TuneFileIndexManager(), TuneFileAttributes(), _fileHeaderContext); + _serial_num = std::max(serial_num, _index_manager->getFlushedSerialNum()); } void @@ -240,6 +280,15 @@ IndexManagerTest::assertStats(uint32_t expNumDiskIndexes, uint32_t expNumMemoryI EXPECT_EQ(expLastMemoryIndexSerialNum, lastMemoryIndexSerialNum); } +void +IndexManagerTest::assert_urgent(const vespalib::string& label, bool pending, bool flush, bool fusion) +{ + SCOPED_TRACE(label); + EXPECT_EQ(pending, has_pending_urgent_flush()); + EXPECT_EQ(flush, has_urgent_memory_index_flush()); + EXPECT_EQ(fusion, has_urgent_fusion()); +} + TEST_F(IndexManagerTest, require_that_empty_memory_index_is_not_flushed) { auto sources = get_source_collection(); @@ -295,10 +344,11 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed) IndexFlushTarget target(_index_manager->getMaintainer()); EXPECT_EQ(vespalib::system_time(), target.getLastFlushTime()); vespalib::Executor::Task::UP flushTask; - runAsMaster([&]() { flushTask = target.initFlush(1, std::make_shared<search::FlushToken>()); }); + runAsMaster([&]() { flushTask = target.initFlush(2, std::make_shared<search::FlushToken>()); }); flushTask->run(); EXPECT_TRUE(FastOS_File::Stat("test_data/index.flush.1", &stat)); EXPECT_EQ(stat._modifiedTime, target.getLastFlushTime()); + EXPECT_EQ(2u, target.getFlushedSerialNum()); sources = get_source_collection(); EXPECT_EQ(2u, sources->getSourceCount()); @@ -317,14 +367,15 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed) resetIndexManager(); IndexFlushTarget target(_index_manager->getMaintainer()); EXPECT_EQ(stat._modifiedTime, target.getLastFlushTime()); + EXPECT_EQ(2u, target.getFlushedSerialNum()); // updated serial number & flush time when nothing to flush std::this_thread::sleep_for(2s); std::chrono::seconds now = duration_cast<seconds>(vespalib::system_clock::now().time_since_epoch()); vespalib::Executor::Task::UP task; - runAsMaster([&]() { task = target.initFlush(2, std::make_shared<search::FlushToken>()); }); + runAsMaster([&]() { task = target.initFlush(3, std::make_shared<search::FlushToken>()); }); EXPECT_FALSE(task); - EXPECT_EQ(2u, target.getFlushedSerialNum()); + EXPECT_EQ(3u, target.getFlushedSerialNum()); EXPECT_LT(stat._modifiedTime, target.getLastFlushTime()); EXPECT_NEAR(now.count(), duration_cast<seconds>(target.getLastFlushTime().time_since_epoch()).count(), 2); } @@ -408,7 +459,7 @@ VESPA_THREAD_STACK_TAG(push_executor) TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated) { - Schema schema(getSchema()); + Schema schema(getSchema(_interleaved_features)); FieldIndexCollection fic(schema, MockFieldLengthInspector()); auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2); auto pushThreads = SequencedTaskExecutor::create(push_executor, 2); @@ -773,11 +824,11 @@ TEST_F(IndexManagerTest, require_that_indexes_manager_stats_can_be_generated) { assertStats(0, 1, 0, 0); addDocument(1); - assertStats(0, 1, 0, 1); + assertStats(0, 1, 0, 2); flushIndexManager(); - assertStats(1, 1, 1, 1); + assertStats(1, 1, 2, 2); addDocument(2); - assertStats(1, 1, 1, 2); + assertStats(1, 1, 2, 3); } TEST_F(IndexManagerTest, require_that_compact_lid_space_works) @@ -879,6 +930,151 @@ TEST_F(IndexManagerTest, fusion_can_be_stopped) EXPECT_EQ(2u, spec.flush_ids[1]); } +struct EnableInterleavedFeaturesParam +{ + enum class Restart { + NONE, + RESTART1, + RESTART2 + }; + vespalib::string name = "no_restart"; + Restart restart = Restart::NONE; + bool doc = false; // Feed doc after enabling interleaved fatures + bool pruned_config = false; // Original config has been pruned + + EnableInterleavedFeaturesParam no_doc_restart1() && { + name = "restart1"; + restart = Restart::RESTART1; + return *this; + } + EnableInterleavedFeaturesParam doc_restart1() && { + name = "doc_restart1"; + restart = Restart::RESTART1; + doc = true; + return *this; + } + EnableInterleavedFeaturesParam doc_restart2() && { + name = "doc_restart2"; + restart = Restart::RESTART2; + doc = true; + return *this; + } + EnableInterleavedFeaturesParam doc_restart2_pruned_config() && { + name = "doc_restart2_pruned_config"; + restart = Restart::RESTART2; + doc = true; + pruned_config = true; + return *this; + } +}; + +std::ostream& operator<<(std::ostream& os, const EnableInterleavedFeaturesParam& param) +{ + os << param.name; + return os; +} + +std::string +param_as_string(const testing::TestParamInfo<std::tuple<bool, bool, EnableInterleavedFeaturesParam>>& info) +{ + std::ostringstream os; + auto& param = info.param; + os << (std::get<0>(param) ? "disk_" : ""); + os << (std::get<1>(param) ? "m1_" : "m0_"); + os << std::get<2>(param); + return os.str(); +} + +class IndexManagerEnableInterleavedFeaturesTest : public IndexManagerTest, + public testing::WithParamInterface<std::tuple<bool, bool, EnableInterleavedFeaturesParam>> +{ +protected: + void enable_interleaved_features(const vespalib::string& label, bool old_config_docs, bool flushed_interleaved_features, std::optional<SerialNum> serial_num = std::nullopt); +}; + +void +IndexManagerEnableInterleavedFeaturesTest::enable_interleaved_features(const vespalib::string& label, bool old_config_docs, bool flushed_interleaved_features, std::optional<SerialNum> serial_num) +{ + if (!serial_num.has_value()) { + serial_num = ++_serial_num; + } + set_schema(getSchema(true), serial_num.value()); + assert_urgent(label, old_config_docs, old_config_docs && !flushed_interleaved_features, old_config_docs && flushed_interleaved_features); +} + +TEST_P(IndexManagerEnableInterleavedFeaturesTest, enable_interleaved_features) +{ + using Restart = EnableInterleavedFeaturesParam::Restart; + const auto& params = GetParam(); + // State before enabling interleaved features + bool initial_disk_index = std::get<0>(params); + bool nonempty_memory_index = std::get<1>(params); + // State for after enabling interleaved features + const auto& enable_params = std::get<2>(params); + bool old_config_docs = false; + + _interleaved_features = false; + SerialNum config_serial_num = 1; + resetIndexManager(config_serial_num); + if (initial_disk_index) { + // Feed doc to memory index without interleaved features and flush + // memory index to disk + addDocument(docid); + old_config_docs = true; + flushIndexManager(); + } + if (nonempty_memory_index) { + // Feed doc to memory index without interleaved features + addDocument(docid + 1); + old_config_docs = true; + } + assert_urgent("setup", false, false, false); + enable_interleaved_features("enable interleaved features", old_config_docs, false); + auto schema_change_serial_num = _serial_num; + EXPECT_EQ(2 + (initial_disk_index ? 1 : 0) + (nonempty_memory_index ? 1 : 0), schema_change_serial_num); + if (enable_params.restart == Restart::RESTART1) { + // Restart after flushing 1st memory index without interleaved features + resetIndexManager(config_serial_num); + assert_urgent("after restart1", false, false, false); + if (nonempty_memory_index) { + addDocument(docid + 1); + } + EXPECT_EQ(schema_change_serial_num, _serial_num + 1); + enable_interleaved_features("replay enable interleaved features after restart1", old_config_docs, false); + } + if (enable_params.doc) { + // Feed second doc to memory index with interleaved features + addDocument(docid + 2); + } + SerialNum disk2_serial_num = schema_change_serial_num + (enable_params.doc ? 1 : 0); + EXPECT_EQ(disk2_serial_num, _serial_num); + flushIndexManager(); + assert_urgent("after 2nd flush", old_config_docs, false, old_config_docs); + if (enable_params.pruned_config) { + // Original config has been pruned + _interleaved_features = true; + config_serial_num = schema_change_serial_num; + } + if (enable_params.restart == Restart::RESTART2) { + // Restart after flushing 2nd memory index with interleaved fatures + resetIndexManager(config_serial_num); + assert_urgent("after restart2", old_config_docs, false, old_config_docs); + EXPECT_EQ(disk2_serial_num, _serial_num); + enable_interleaved_features("replay enable interleaved features after restart2", old_config_docs, true, schema_change_serial_num); + } + run_fusion(); + assert_urgent("after fusion", false, false, false); +} + +auto test_values = testing::Combine(testing::Bool(), testing::Bool(), + testing::Values(EnableInterleavedFeaturesParam(), + EnableInterleavedFeaturesParam().no_doc_restart1(), + EnableInterleavedFeaturesParam().doc_restart1(), + EnableInterleavedFeaturesParam().doc_restart2(), + EnableInterleavedFeaturesParam().doc_restart2_pruned_config())); + +INSTANTIATE_TEST_SUITE_P(MultiIndexManagerEnableInterleavedFeaturesTest, IndexManagerEnableInterleavedFeaturesTest, test_values, param_as_string); + } // namespace int diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h index b0243763824..c1ad4bb316b 100644 --- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h +++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h @@ -129,6 +129,9 @@ public: void setMaxFlushed(uint32_t maxFlushed) override { _maintainer.setMaxFlushed(maxFlushed); } + bool has_pending_urgent_flush() const override { + return _maintainer.has_pending_urgent_flush(); + } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h b/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h index 12a732ca081..a9f386bfc12 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h @@ -29,6 +29,7 @@ struct MockIndexManager : public searchcorespi::IIndexManager void heartBeat(SerialNum) override {} void compactLidSpace(uint32_t, SerialNum) override {} void setMaxFlushed(uint32_t) override { } + bool has_pending_urgent_flush() const override { return false; } }; } diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmanager.h b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h index 44e1ad306db..b6bbe416562 100644 --- a/searchcore/src/vespa/searchcorespi/index/iindexmanager.h +++ b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h @@ -200,6 +200,15 @@ public: * @param maxFlushed The max number of flushed indexes before fusion is urgent. */ virtual void setMaxFlushed(uint32_t maxFlushed) = 0; + + /** + * Checks if we have a pending urgent flush due to a recent + * schema change (e.g. regeneration of interleaved features in + * disk indexes). + * + * @return whether an urgent flush is pending + */ + virtual bool has_pending_urgent_flush() const = 0; }; } // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp index cfb4c3e609f..051175be9af 100644 --- a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp +++ b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp @@ -76,6 +76,7 @@ IndexManagerExplorer::get_state(const Inserter &inserter, bool full) const object.setLong("lastSerialNum", _mgr->getCurrentSerialNum()); if (full) { IndexManagerStats stats(*_mgr); + object.setBool("pending_urgent_flush", _mgr->has_pending_urgent_flush()); Cursor &diskIndexArrayCursor = object.setArray("diskIndexes"); for (const auto &diskIndex : stats.getDiskIndexes()) { insertDiskIndex(diskIndexArrayCursor, diskIndex); diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp index 9e290c7e525..4e8201f75fd 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -44,7 +44,8 @@ IndexFlushTarget::needUrgentFlush() const // Due to limitation of 16G address space of single datastore // TODO: Even better if urgency was decided by memory index itself. bool urgent = (_numFrozenMemoryIndexes > _maxFrozenMemoryIndexes) || - (getApproxMemoryGain().gain() > ssize_t(16_Gi)); + (getApproxMemoryGain().gain() > ssize_t(16_Gi)) || + _indexMaintainer.urgent_memory_index_flush(); SerialNum flushedSerial = _indexMaintainer.getFlushedSerialNum(); LOG(debug, "Num frozen: %u Memory gain: %" PRId64 " Urgent: %d, flushedSerial=%" PRIu64, _numFrozenMemoryIndexes, getApproxMemoryGain().gain(), static_cast<int>(urgent), flushedSerial); diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp index 175f8dbd800..f889fad136f 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -69,7 +69,8 @@ IndexFusionTarget::getApproxDiskGain() const bool IndexFusionTarget::needUrgentFlush() const { - bool urgent = (_fusionStats.numUnfused > _fusionStats.maxFlushed) && (_fusionStats._canRunFusion); + bool urgent = (_fusionStats.numUnfused > _fusionStats.maxFlushed || _indexMaintainer.urgent_disk_index_fusion()) && + (_fusionStats._canRunFusion); LOG(debug, "Num flushed: %d Urgent: %d", _fusionStats.numUnfused, urgent); return urgent; } diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp index 8270725f8e3..50bbbaec355 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -768,26 +768,8 @@ IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current) } } -namespace { - -bool -has_matching_interleaved_features(const Schema& old_schema, const Schema& new_schema) -{ - for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) { - if (itr.hasMatchingOldFields(old_schema) && - !itr.has_matching_use_interleaved_features(old_schema)) - { - return false; - } - } - return true; -} - -} - - void -IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex) +IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex, SerialNum serial_num) { assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor LockGuard state_lock(_state_lock); @@ -818,12 +800,12 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex _frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId); } _current_index = newIndex; - // Non-matching interleaved features in schemas means that we need to - // reconstruct or drop interleaved features in posting lists. - // If so, we must flush the new index to disk even if it is empty. - // This ensures that 2x triggerFlush will run fusion - // to reconstruct or drop interleaved features in the posting lists. - _flush_empty_current_index = !has_matching_interleaved_features(args._oldSchema, args._newSchema); + if (serial_num > flush_serial_num() && get_absolute_id() > 1) { + consider_urgent_flush(args._oldSchema, args._newSchema, get_absolute_id()); + } + // If schema changes triggered a need for urgent flush then we must + // be able to flush the new index to disk even if it is empty. + _flush_empty_current_index = (_urgent_flush_id == get_absolute_id()); } if (dropEmptyLast) { replaceSource(_current_index_id, _current_index); @@ -887,6 +869,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, _last_fusion_id(), _next_id(), _current_index_id(), + _urgent_flush_id(), _current_index(), _flush_empty_current_index(false), _current_serial_num(0), @@ -950,6 +933,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, pruneRemovedFields(_schema, config.getSerialNum()); })); _ctx.getThreadingService().master().sync(); + consider_initial_urgent_flush(); } IndexMaintainer::~IndexMaintainer() @@ -1320,7 +1304,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) // Ensure that all index thread tasks accessing memory index have completed. commit_and_wait(); // Everything should be quiet now. - doneSetSchema(args, new_index); + doneSetSchema(args, new_index, serialNum); // Source collection has now changed, caller must reconfigure further // as appropriate. } @@ -1358,4 +1342,79 @@ IndexMaintainer::setMaxFlushed(uint32_t maxFlushed) _maxFlushed = maxFlushed; } +void +IndexMaintainer::consider_urgent_flush(const Schema& old_schema, const Schema& new_schema, uint32_t flush_id) +{ + // Non-matching interleaved features in schemas means that we need to + // reconstruct or drop interleaved features in posting lists. Schedule + // urgent flush until all indexes are in sync. + for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) { + if (itr.hasMatchingOldFields(old_schema) && + !itr.has_matching_use_interleaved_features(old_schema)) + { + _urgent_flush_id = flush_id; + break; + } + } +} + +void +IndexMaintainer::consider_initial_urgent_flush() +{ + const Schema *prev_schema = nullptr; + std::optional<uint32_t> urgent_source_id; + auto coll = getSourceCollection(); + uint32_t count = coll->getSourceCount(); + for (uint32_t i = 0; i < count; ++i) { + IndexSearchable &is = coll->getSearchable(i); + const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is); + if (d != nullptr) { + auto schema = &d->getSchema(); + if (prev_schema != nullptr) { + consider_urgent_flush(*prev_schema, *schema, _last_fusion_id + coll->getSourceId(i)); + } + prev_schema = schema; + } + } +} + +uint32_t +IndexMaintainer::get_urgent_flush_id() const +{ + LockGuard lock(_index_update_lock); + return _urgent_flush_id; +} + +bool +IndexMaintainer::urgent_memory_index_flush() const +{ + LockGuard lock(_index_update_lock); + for (auto& frozen : _frozenMemoryIndexes) { + if (frozen._absoluteId == _urgent_flush_id) { + return true; + } + } + if (get_absolute_id() == _urgent_flush_id) { + return true; + } + return false; +} + +bool +IndexMaintainer::urgent_disk_index_fusion() const +{ + uint32_t urgent_flush_id = get_urgent_flush_id(); + LockGuard lock(_fusion_lock); + auto& flush_ids = _fusion_spec.flush_ids; + return std::find(flush_ids.begin(), flush_ids.end(), urgent_flush_id) != std::end(flush_ids); +} + +bool +IndexMaintainer::has_pending_urgent_flush() const +{ + uint32_t urgent_flush_id = get_urgent_flush_id(); + LockGuard lock(_fusion_lock); + return urgent_flush_id > _fusion_spec.last_fusion_id; +} + } diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h index 9c0d7c7373e..6db588d83ac 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h @@ -86,6 +86,7 @@ class IndexMaintainer : public IIndexManager, uint32_t _last_fusion_id; // Protected by SL + IUL uint32_t _next_id; // Protected by SL + IUL uint32_t _current_index_id; // Protected by SL + IUL + uint32_t _urgent_flush_id; // Protected by SL + IUL std::shared_ptr<IMemoryIndex> _current_index; // Protected by SL + IUL bool _flush_empty_current_index; std::atomic<SerialNum> _current_serial_num;// Writes protected by IUL @@ -243,7 +244,7 @@ class IndexMaintainer : public IIndexManager, ~SetSchemaArgs(); }; - void doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex); + void doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex, SerialNum serial_num); Schema getSchema(void) const; std::shared_ptr<Schema> getActiveFusionPrunedSchema() const; @@ -368,6 +369,12 @@ public: IFlushTarget::List getFlushTargets() override; void setSchema(const Schema & schema, SerialNum serialNum) override ; void setMaxFlushed(uint32_t maxFlushed) override; + void consider_urgent_flush(const Schema& old_schema, const Schema& new_schema, uint32_t flush_id); + void consider_initial_urgent_flush(); + uint32_t get_urgent_flush_id() const; + bool urgent_memory_index_flush() const; + bool urgent_disk_index_fusion() const; + bool has_pending_urgent_flush() const override; }; } |