diff options
author | Tor Egge <Tor.Egge@online.no> | 2023-10-09 13:46:12 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2023-10-09 13:46:12 +0200 |
commit | 4e3dbe2b7974452ed643cd4fd4bf1daf7e4b5970 (patch) | |
tree | 5d968cd9f7c10379b9092af2f87a554ae9db656c /searchcore | |
parent | b09acf5a94ff3fe7b70381478fedcc242d965c32 (diff) |
Flush memory indexes to disk then fusion disk indexes as soon as
possible when enabling interleaved features.
Diffstat (limited to 'searchcore')
9 files changed, 298 insertions, 46 deletions
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 91f585a4f45..9335a1d5583 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -31,6 +31,7 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/fastos/file.h> #include <filesystem> +#include <ostream> #include <set> #include <thread> @@ -87,9 +88,9 @@ const uint32_t docid = 1; auto add_fields = [](auto& header) { header.addField(field_name, document::DataType::T_STRING); }; -Schema getSchema() { +Schema getSchema(std::optional<bool> interleaved_features) { DocBuilder db(add_fields); - return SchemaBuilder(db).add_all_indexes().build(); + return SchemaBuilder(db).add_all_indexes(interleaved_features).build(); } void removeTestData() { @@ -117,7 +118,7 @@ struct IndexManagerTest : public ::testing::Test { DummyFileHeaderContext _fileHeaderContext; TransportAndExecutorService _service; std::unique_ptr<IndexManager> _index_manager; - Schema _schema; + std::optional<bool> _interleaved_features; DocBuilder _builder; IndexManagerTest() @@ -126,7 +127,7 @@ struct IndexManagerTest : public ::testing::Test { _fileHeaderContext(), _service(1), _index_manager(), - _schema(getSchema()), + _interleaved_features(), _builder(add_fields) { removeTestData(); @@ -139,7 +140,7 @@ struct IndexManagerTest : public ::testing::Test { } template <class FunctionType> - inline void runAsMaster(FunctionType &&function) { + void runAsMaster(FunctionType &&function) { vespalib::Gate gate; _service.write().master().execute(makeLambdaTask([&gate,function = std::move(function)]() { function(); @@ -148,7 +149,7 @@ struct IndexManagerTest : public ::testing::Test { gate.await(); } template <class FunctionType> - inline void runAsIndex(FunctionType &&function) { + void runAsIndex(FunctionType &&function) { vespalib::Gate gate; _service.write().index().execute(makeLambdaTask([&gate,function = std::move(function)]() { function(); @@ -157,8 +158,9 @@ struct IndexManagerTest : public ::testing::Test { gate.await(); } void flushIndexManager(); + void run_fusion(); Document::UP addDocument(uint32_t docid); - void resetIndexManager(); + void resetIndexManager(SerialNum serial_num = 1); void removeDocument(uint32_t docId, SerialNum serialNum) { vespalib::Gate gate; runAsIndex([&]() { @@ -177,12 +179,23 @@ struct IndexManagerTest : public ::testing::Test { } void assertStats(uint32_t expNumDiskIndexes, uint32_t expNumMemoryIndexes, - SerialNum expLastiskIndexSerialNum, + SerialNum expLastDiskIndexSerialNum, SerialNum expLastMemoryIndexSerialNum); IIndexCollection::SP get_source_collection() const { return _index_manager->getMaintainer().getSourceCollection(); } + void set_schema(const Schema& schema, SerialNum serial_num) + { + runAsMaster([&]() { _index_manager->setSchema(schema, serial_num); }); + } + + bool has_pending_urgent_flush() const { + return _index_manager->has_pending_urgent_flush(); + } + bool has_urgent_memory_index_flush() const; + bool has_urgent_fusion() const; + void assert_urgent(const vespalib::string& label, bool pending, bool flush, bool fusion); }; void @@ -197,6 +210,31 @@ IndexManagerTest::flushIndexManager() } } +void +IndexManagerTest::run_fusion() +{ + IndexFusionTarget target(_index_manager->getMaintainer()); + std::unique_ptr<vespalib::Executor::Task> task; + runAsMaster([&]() { task = target.initFlush(0, std::make_shared<search::FlushToken>()); }); + if (task) { + task->run(); + } +} + +bool +IndexManagerTest::has_urgent_memory_index_flush() const +{ + IndexFlushTarget target(_index_manager->getMaintainer()); + return target.needUrgentFlush(); +} + +bool +IndexManagerTest::has_urgent_fusion() const +{ + IndexFusionTarget target(_index_manager->getMaintainer()); + return target.needUrgentFlush(); +} + Document::UP IndexManagerTest::addDocument(uint32_t id) { @@ -211,12 +249,13 @@ IndexManagerTest::addDocument(uint32_t id) } void -IndexManagerTest::resetIndexManager() +IndexManagerTest::resetIndexManager(SerialNum serial_num) { _index_manager.reset(); - _index_manager = std::make_unique<IndexManager>(index_dir, IndexConfig(), getSchema(), 1, + _index_manager = std::make_unique<IndexManager>(index_dir, IndexConfig(), getSchema(_interleaved_features), serial_num, _reconfigurer, _service.write(), _service.shared(), TuneFileIndexManager(), TuneFileAttributes(), _fileHeaderContext); + _serial_num = std::max(serial_num, _index_manager->getFlushedSerialNum()); } void @@ -240,6 +279,15 @@ IndexManagerTest::assertStats(uint32_t expNumDiskIndexes, uint32_t expNumMemoryI EXPECT_EQ(expLastMemoryIndexSerialNum, lastMemoryIndexSerialNum); } +void +IndexManagerTest::assert_urgent(const vespalib::string& label, bool pending, bool flush, bool fusion) +{ + SCOPED_TRACE(label); + EXPECT_EQ(pending, has_pending_urgent_flush()); + EXPECT_EQ(flush, has_urgent_memory_index_flush()); + EXPECT_EQ(fusion, has_urgent_fusion()); +} + TEST_F(IndexManagerTest, require_that_empty_memory_index_is_not_flushed) { auto sources = get_source_collection(); @@ -295,10 +343,11 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed) IndexFlushTarget target(_index_manager->getMaintainer()); EXPECT_EQ(vespalib::system_time(), target.getLastFlushTime()); vespalib::Executor::Task::UP flushTask; - runAsMaster([&]() { flushTask = target.initFlush(1, std::make_shared<search::FlushToken>()); }); + runAsMaster([&]() { flushTask = target.initFlush(2, std::make_shared<search::FlushToken>()); }); flushTask->run(); EXPECT_TRUE(FastOS_File::Stat("test_data/index.flush.1", &stat)); EXPECT_EQ(stat._modifiedTime, target.getLastFlushTime()); + EXPECT_EQ(2u, target.getFlushedSerialNum()); sources = get_source_collection(); EXPECT_EQ(2u, sources->getSourceCount()); @@ -317,14 +366,15 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed) resetIndexManager(); IndexFlushTarget target(_index_manager->getMaintainer()); EXPECT_EQ(stat._modifiedTime, target.getLastFlushTime()); + EXPECT_EQ(2u, target.getFlushedSerialNum()); // updated serial number & flush time when nothing to flush std::this_thread::sleep_for(2s); std::chrono::seconds now = duration_cast<seconds>(vespalib::system_clock::now().time_since_epoch()); vespalib::Executor::Task::UP task; - runAsMaster([&]() { task = target.initFlush(2, std::make_shared<search::FlushToken>()); }); + runAsMaster([&]() { task = target.initFlush(3, std::make_shared<search::FlushToken>()); }); EXPECT_FALSE(task); - EXPECT_EQ(2u, target.getFlushedSerialNum()); + EXPECT_EQ(3u, target.getFlushedSerialNum()); EXPECT_LT(stat._modifiedTime, target.getLastFlushTime()); EXPECT_NEAR(now.count(), duration_cast<seconds>(target.getLastFlushTime().time_since_epoch()).count(), 2); } @@ -408,7 +458,7 @@ VESPA_THREAD_STACK_TAG(push_executor) TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated) { - Schema schema(getSchema()); + Schema schema(getSchema(_interleaved_features)); FieldIndexCollection fic(schema, MockFieldLengthInspector()); auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2); auto pushThreads = SequencedTaskExecutor::create(push_executor, 2); @@ -773,11 +823,11 @@ TEST_F(IndexManagerTest, require_that_indexes_manager_stats_can_be_generated) { assertStats(0, 1, 0, 0); addDocument(1); - assertStats(0, 1, 0, 1); + assertStats(0, 1, 0, 2); flushIndexManager(); - assertStats(1, 1, 1, 1); + assertStats(1, 1, 2, 2); addDocument(2); - assertStats(1, 1, 1, 2); + assertStats(1, 1, 2, 3); } TEST_F(IndexManagerTest, require_that_compact_lid_space_works) @@ -879,6 +929,126 @@ TEST_F(IndexManagerTest, fusion_can_be_stopped) EXPECT_EQ(2u, spec.flush_ids[1]); } +struct EnableInterleavedFeaturesParam +{ + vespalib::string name = "no_restart"; + bool restart1 = false; // Restart after flushing 1st memory index without interleaved features + bool doc2 = false; // Feed second doc + bool restart2 = false; // Restart after flushing 2nd memory index with interleaved fatures + bool pruned_config = false; // Original config has been pruned + + EnableInterleavedFeaturesParam restart() && { + name = "restart"; + restart1 = true; + return *this; + } + EnableInterleavedFeaturesParam new_doc_restart() && { + name = "new_doc_restart"; + restart1 = true; + doc2 = true; + return *this; + } + EnableInterleavedFeaturesParam new_doc_multiple_restarts() && { + name = "new_doc_multiple_restart"; + restart1 = true; + doc2 = true; + restart2 = true; + return *this; + } + EnableInterleavedFeaturesParam new_doc_pruned_config() && { + name = "new_doc_pruned_config"; + restart1 = true; + doc2 = true; + restart2 = true; + pruned_config = true; + return *this; + } + EnableInterleavedFeaturesParam new_doc_multiple_restarts_pruned_config() && { + name = "new_doc_multiple_restarts_pruned_config"; + restart1 = true; + doc2 = true; + restart2 = true; + pruned_config = true; + return *this; + } +}; + +std::ostream& operator<<(std::ostream& os, const EnableInterleavedFeaturesParam& param) +{ + os << param.name; + return os; +} + +class IndexManagerEnableInterleavedFeaturesTest : public IndexManagerTest, + public testing::WithParamInterface<EnableInterleavedFeaturesParam> +{ +protected: + void enable_interleaved_features(const vespalib::string& label, bool flushed_interleaved_features, std::optional<SerialNum> serial_num = std::nullopt); +}; + +void +IndexManagerEnableInterleavedFeaturesTest::enable_interleaved_features(const vespalib::string& label, bool flushed_interleaved_features, std::optional<SerialNum> serial_num) +{ + if (!serial_num.has_value()) { + serial_num = ++_serial_num; + } + set_schema(getSchema(true), serial_num.value()); + assert_urgent(label, true, !flushed_interleaved_features, flushed_interleaved_features); +} + +TEST_P(IndexManagerEnableInterleavedFeaturesTest, enable_interleaved_features) +{ + const auto& params = GetParam(); + _interleaved_features = false; + SerialNum config_gen = 1; + resetIndexManager(config_gen); + // Feed first doc to memory index without interleaved features + addDocument(docid); + assert_urgent("setup", false, false, false); + flushIndexManager(); + assert_urgent("after 1st flush", false, false, false); + enable_interleaved_features("enable interleaved features", false); + auto schema_change_serial_num = _serial_num; + EXPECT_EQ(3, schema_change_serial_num); + if (params.restart1) { + // Restart after flushing 1st memory index without interleaved features + resetIndexManager(config_gen); + assert_urgent("after restart1", false, false, false); + EXPECT_EQ(schema_change_serial_num, _serial_num + 1); + enable_interleaved_features("replay enable interleaved features after restart1", false); + } + if (params.doc2) { + // Feed second doc to memory index with interleaved features + addDocument(docid + 1); + } + SerialNum disk2_serial_num = schema_change_serial_num + (params.doc2 ? 1 : 0); + EXPECT_EQ(disk2_serial_num, _serial_num); + flushIndexManager(); + assert_urgent("after 2nd flush", true, false, true); + if (params.pruned_config) { + // Original config has been pruned + _interleaved_features = true; + config_gen = 3; + } + if (params.restart2) { + // Restart after flushing 2nd memory index with interleaved fatures + resetIndexManager(config_gen); + assert_urgent("after restart2", true, false, true); + EXPECT_EQ(disk2_serial_num, _serial_num); + enable_interleaved_features("replay enable interleaved features after restart2", true, schema_change_serial_num); + } + run_fusion(); + assert_urgent("after fusion", false, false, false); +} + +auto test_values = testing::Values(EnableInterleavedFeaturesParam(), + EnableInterleavedFeaturesParam().restart(), + EnableInterleavedFeaturesParam().new_doc_restart(), + EnableInterleavedFeaturesParam().new_doc_multiple_restarts(), + EnableInterleavedFeaturesParam().new_doc_multiple_restarts_pruned_config()); + +INSTANTIATE_TEST_SUITE_P(MultiIndexManagerEnableInterleavedFeaturesTest, IndexManagerEnableInterleavedFeaturesTest, test_values, testing::PrintToStringParamName()); + } // namespace int diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h index 436b4127804..d7efd9c4679 100644 --- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h +++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h @@ -129,6 +129,9 @@ public: void setMaxFlushed(uint32_t maxFlushed) override { _maintainer.setMaxFlushed(maxFlushed); } + bool has_pending_urgent_flush() const override { + return _maintainer.has_pending_urgent_flush(); + } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h b/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h index 134aeabff58..996ac301e0b 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h @@ -29,6 +29,7 @@ struct MockIndexManager : public searchcorespi::IIndexManager void heartBeat(SerialNum) override {} void compactLidSpace(uint32_t, SerialNum) override {} void setMaxFlushed(uint32_t) override { } + bool has_pending_urgent_flush() const override { return false; } }; } diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmanager.h b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h index a11ae12f26d..7ddfc33980d 100644 --- a/searchcore/src/vespa/searchcorespi/index/iindexmanager.h +++ b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h @@ -200,6 +200,15 @@ public: * @param maxFlushed The max number of flushed indexes before fusion is urgent. */ virtual void setMaxFlushed(uint32_t maxFlushed) = 0; + + /** + * Checks if we have a pending urgent flush due to a recent + * schema change (e.g. regeneration of interleaved features in + * disk indexes). + * + * @return whether an urgent flush is pending + */ + virtual bool has_pending_urgent_flush() const = 0; }; } // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp index 1634937f094..74c3e16227e 100644 --- a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp +++ b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp @@ -76,6 +76,7 @@ IndexManagerExplorer::get_state(const Inserter &inserter, bool full) const object.setLong("lastSerialNum", _mgr->getCurrentSerialNum()); if (full) { IndexManagerStats stats(*_mgr); + object.setBool("pending_urgent_flush", _mgr->has_pending_urgent_flush()); Cursor &diskIndexArrayCursor = object.setArray("diskIndexes"); for (const auto &diskIndex : stats.getDiskIndexes()) { insertDiskIndex(diskIndexArrayCursor, diskIndex); diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp index b5a5e2c2843..934e9a6520e 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -44,7 +44,8 @@ IndexFlushTarget::needUrgentFlush() const // Due to limitation of 16G address space of single datastore // TODO: Even better if urgency was decided by memory index itself. bool urgent = (_numFrozenMemoryIndexes > _maxFrozenMemoryIndexes) || - (getApproxMemoryGain().gain() > ssize_t(16_Gi)); + (getApproxMemoryGain().gain() > ssize_t(16_Gi)) || + _indexMaintainer.urgent_memory_index_flush(); SerialNum flushedSerial = _indexMaintainer.getFlushedSerialNum(); LOG(debug, "Num frozen: %u Memory gain: %" PRId64 " Urgent: %d, flushedSerial=%" PRIu64, _numFrozenMemoryIndexes, getApproxMemoryGain().gain(), static_cast<int>(urgent), flushedSerial); diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp index 562d49a4348..2baaaa19b93 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -69,7 +69,8 @@ IndexFusionTarget::getApproxDiskGain() const bool IndexFusionTarget::needUrgentFlush() const { - bool urgent = (_fusionStats.numUnfused > _fusionStats.maxFlushed) && (_fusionStats._canRunFusion); + bool urgent = (_fusionStats.numUnfused > _fusionStats.maxFlushed || _indexMaintainer.urgent_disk_index_fusion()) && + (_fusionStats._canRunFusion); LOG(debug, "Num flushed: %d Urgent: %d", _fusionStats.numUnfused, urgent); return urgent; } diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp index a7828c00bc4..96a29994fbc 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -768,26 +768,8 @@ IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current) } } -namespace { - -bool -has_matching_interleaved_features(const Schema& old_schema, const Schema& new_schema) -{ - for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) { - if (itr.hasMatchingOldFields(old_schema) && - !itr.has_matching_use_interleaved_features(old_schema)) - { - return false; - } - } - return true; -} - -} - - void -IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex) +IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex, SerialNum serial_num) { assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor LockGuard state_lock(_state_lock); @@ -818,12 +800,12 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex _frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId); } _current_index = newIndex; - // Non-matching interleaved features in schemas means that we need to - // reconstruct or drop interleaved features in posting lists. - // If so, we must flush the new index to disk even if it is empty. - // This ensures that 2x triggerFlush will run fusion - // to reconstruct or drop interleaved features in the posting lists. - _flush_empty_current_index = !has_matching_interleaved_features(args._oldSchema, args._newSchema); + if (serial_num > flush_serial_num()) { + consider_urgent_flush(args._oldSchema, args._newSchema, get_absolute_id()); + } + // If schema changes triggered a need for urgent flush then we must + // be able to flush the new index to disk even if it is empty. + _flush_empty_current_index = (_urgent_flush_id == get_absolute_id()); } if (dropEmptyLast) { replaceSource(_current_index_id, _current_index); @@ -887,6 +869,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, _last_fusion_id(), _next_id(), _current_index_id(), + _urgent_flush_id(), _current_index(), _flush_empty_current_index(false), _current_serial_num(0), @@ -950,6 +933,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, pruneRemovedFields(_schema, config.getSerialNum()); })); _ctx.getThreadingService().master().sync(); + consider_initial_urgent_flush(); } IndexMaintainer::~IndexMaintainer() @@ -1320,7 +1304,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) // Ensure that all index thread tasks accessing memory index have completed. commit_and_wait(); // Everything should be quiet now. - doneSetSchema(args, new_index); + doneSetSchema(args, new_index, serialNum); // Source collection has now changed, caller must reconfigure further // as appropriate. } @@ -1358,4 +1342,79 @@ IndexMaintainer::setMaxFlushed(uint32_t maxFlushed) _maxFlushed = maxFlushed; } +void +IndexMaintainer::consider_urgent_flush(const Schema& old_schema, const Schema& new_schema, uint32_t flush_id) +{ + // Non-matching interleaved features in schemas means that we need to + // reconstruct or drop interleaved features in posting lists. Schedule + // urgent flush until all indexes are in sync. + for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) { + if (itr.hasMatchingOldFields(old_schema) && + !itr.has_matching_use_interleaved_features(old_schema)) + { + _urgent_flush_id = flush_id; + break; + } + } +} + +void +IndexMaintainer::consider_initial_urgent_flush() +{ + const Schema *prev_schema = nullptr; + std::optional<uint32_t> urgent_source_id; + auto coll = getSourceCollection(); + uint32_t count = coll->getSourceCount(); + for (uint32_t i = 0; i < count; ++i) { + IndexSearchable &is = coll->getSearchable(i); + const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is); + if (d != nullptr) { + auto schema = &d->getSchema(); + if (prev_schema != nullptr) { + consider_urgent_flush(*prev_schema, *schema, _last_fusion_id + coll->getSourceId(i)); + } + prev_schema = schema; + } + } +} + +uint32_t +IndexMaintainer::get_urgent_flush_id() const +{ + LockGuard lock(_index_update_lock); + return _urgent_flush_id; +} + +bool +IndexMaintainer::urgent_memory_index_flush() const +{ + LockGuard lock(_index_update_lock); + for (auto& frozen : _frozenMemoryIndexes) { + if (frozen._absoluteId == _urgent_flush_id) { + return true; + } + } + if (get_absolute_id() == _urgent_flush_id) { + return true; + } + return false; +} + +bool +IndexMaintainer::urgent_disk_index_fusion() const +{ + uint32_t urgent_flush_id = get_urgent_flush_id(); + LockGuard lock(_fusion_lock); + auto& flush_ids = _fusion_spec.flush_ids; + return std::find(flush_ids.begin(), flush_ids.end(), urgent_flush_id) != std::end(flush_ids); +} + +bool +IndexMaintainer::has_pending_urgent_flush() const +{ + uint32_t urgent_flush_id = get_urgent_flush_id(); + LockGuard lock(_fusion_lock); + return urgent_flush_id > _fusion_spec.last_fusion_id; +} + } diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h index 963e669bc4c..1777b61f6e6 100644 --- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h @@ -86,6 +86,7 @@ class IndexMaintainer : public IIndexManager, uint32_t _last_fusion_id; // Protected by SL + IUL uint32_t _next_id; // Protected by SL + IUL uint32_t _current_index_id; // Protected by SL + IUL + uint32_t _urgent_flush_id; // Protected by SL + IUL std::shared_ptr<IMemoryIndex> _current_index; // Protected by SL + IUL bool _flush_empty_current_index; std::atomic<SerialNum> _current_serial_num;// Writes protected by IUL @@ -243,7 +244,7 @@ class IndexMaintainer : public IIndexManager, ~SetSchemaArgs(); }; - void doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex); + void doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex, SerialNum serial_num); Schema getSchema(void) const; std::shared_ptr<Schema> getActiveFusionPrunedSchema() const; @@ -368,6 +369,12 @@ public: IFlushTarget::List getFlushTargets() override; void setSchema(const Schema & schema, SerialNum serialNum) override ; void setMaxFlushed(uint32_t maxFlushed) override; + void consider_urgent_flush(const Schema& old_schema, const Schema& new_schema, uint32_t flush_id); + void consider_initial_urgent_flush(); + uint32_t get_urgent_flush_id() const; + bool urgent_memory_index_flush() const; + bool urgent_disk_index_fusion() const; + bool has_pending_urgent_flush() const override; }; } |