summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@yahooinc.com>2023-10-10 14:23:22 +0200
committerGitHub <noreply@github.com>2023-10-10 14:23:22 +0200
commit42707ddb5a30dc4e530e2f5d2d56539462a62ad9 (patch)
tree3a2b418f5748dad750d9bb249be3003da8f6a3ff /searchcore
parentf36d2cf35428720fc40f330ed2ce27b325e175db (diff)
parent26a777173863791c024b56aab805bce5737dcf00 (diff)
Merge pull request #28840 from vespa-engine/toregge/flush-indexes-when-enabling-interleaved-features
Flush memory indexes to disk then fusion disk indexes as soon as
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp230
-rw-r--r--searchcore/src/vespa/searchcore/proton/index/indexmanager.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h1
-rw-r--r--searchcore/src/vespa/searchcorespi/index/iindexmanager.h9
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp1
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp3
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp3
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp111
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainer.h9
9 files changed, 324 insertions, 46 deletions
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index f7bccd43576..ea447a85e06 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -31,7 +31,9 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/fastos/file.h>
#include <filesystem>
+#include <ostream>
#include <set>
+#include <sstream>
#include <thread>
#include <vespa/log/log.h>
@@ -87,9 +89,9 @@ const uint32_t docid = 1;
auto add_fields = [](auto& header) { header.addField(field_name, document::DataType::T_STRING); };
-Schema getSchema() {
+Schema getSchema(std::optional<bool> interleaved_features) {
DocBuilder db(add_fields);
- return SchemaBuilder(db).add_all_indexes().build();
+ return SchemaBuilder(db).add_all_indexes(interleaved_features).build();
}
void removeTestData() {
@@ -117,7 +119,7 @@ struct IndexManagerTest : public ::testing::Test {
DummyFileHeaderContext _fileHeaderContext;
TransportAndExecutorService _service;
std::unique_ptr<IndexManager> _index_manager;
- Schema _schema;
+ std::optional<bool> _interleaved_features;
DocBuilder _builder;
IndexManagerTest()
@@ -126,7 +128,7 @@ struct IndexManagerTest : public ::testing::Test {
_fileHeaderContext(),
_service(1),
_index_manager(),
- _schema(getSchema()),
+ _interleaved_features(),
_builder(add_fields)
{
removeTestData();
@@ -139,7 +141,7 @@ struct IndexManagerTest : public ::testing::Test {
}
template <class FunctionType>
- inline void runAsMaster(FunctionType &&function) {
+ void runAsMaster(FunctionType &&function) {
vespalib::Gate gate;
_service.write().master().execute(makeLambdaTask([&gate,function = std::move(function)]() {
function();
@@ -148,7 +150,7 @@ struct IndexManagerTest : public ::testing::Test {
gate.await();
}
template <class FunctionType>
- inline void runAsIndex(FunctionType &&function) {
+ void runAsIndex(FunctionType &&function) {
vespalib::Gate gate;
_service.write().index().execute(makeLambdaTask([&gate,function = std::move(function)]() {
function();
@@ -157,8 +159,9 @@ struct IndexManagerTest : public ::testing::Test {
gate.await();
}
void flushIndexManager();
+ void run_fusion();
Document::UP addDocument(uint32_t docid);
- void resetIndexManager();
+ void resetIndexManager(SerialNum serial_num = 1);
void removeDocument(uint32_t docId, SerialNum serialNum) {
vespalib::Gate gate;
runAsIndex([&]() {
@@ -177,12 +180,23 @@ struct IndexManagerTest : public ::testing::Test {
}
void assertStats(uint32_t expNumDiskIndexes,
uint32_t expNumMemoryIndexes,
- SerialNum expLastiskIndexSerialNum,
+ SerialNum expLastDiskIndexSerialNum,
SerialNum expLastMemoryIndexSerialNum);
IIndexCollection::SP get_source_collection() const {
return _index_manager->getMaintainer().getSourceCollection();
}
+ void set_schema(const Schema& schema, SerialNum serial_num)
+ {
+ runAsMaster([&]() { _index_manager->setSchema(schema, serial_num); });
+ }
+
+ bool has_pending_urgent_flush() const {
+ return _index_manager->has_pending_urgent_flush();
+ }
+ bool has_urgent_memory_index_flush() const;
+ bool has_urgent_fusion() const;
+ void assert_urgent(const vespalib::string& label, bool pending, bool flush, bool fusion);
};
void
@@ -197,6 +211,31 @@ IndexManagerTest::flushIndexManager()
}
}
+void
+IndexManagerTest::run_fusion()
+{
+ IndexFusionTarget target(_index_manager->getMaintainer());
+ std::unique_ptr<vespalib::Executor::Task> task;
+ runAsMaster([&]() { task = target.initFlush(0, std::make_shared<search::FlushToken>()); });
+ if (task) {
+ task->run();
+ }
+}
+
+bool
+IndexManagerTest::has_urgent_memory_index_flush() const
+{
+ IndexFlushTarget target(_index_manager->getMaintainer());
+ return target.needUrgentFlush();
+}
+
+bool
+IndexManagerTest::has_urgent_fusion() const
+{
+ IndexFusionTarget target(_index_manager->getMaintainer());
+ return target.needUrgentFlush();
+}
+
Document::UP
IndexManagerTest::addDocument(uint32_t id)
{
@@ -211,12 +250,13 @@ IndexManagerTest::addDocument(uint32_t id)
}
void
-IndexManagerTest::resetIndexManager()
+IndexManagerTest::resetIndexManager(SerialNum serial_num)
{
_index_manager.reset();
- _index_manager = std::make_unique<IndexManager>(index_dir, IndexConfig(), getSchema(), 1,
+ _index_manager = std::make_unique<IndexManager>(index_dir, IndexConfig(), getSchema(_interleaved_features), serial_num,
_reconfigurer, _service.write(), _service.shared(),
TuneFileIndexManager(), TuneFileAttributes(), _fileHeaderContext);
+ _serial_num = std::max(serial_num, _index_manager->getFlushedSerialNum());
}
void
@@ -240,6 +280,15 @@ IndexManagerTest::assertStats(uint32_t expNumDiskIndexes, uint32_t expNumMemoryI
EXPECT_EQ(expLastMemoryIndexSerialNum, lastMemoryIndexSerialNum);
}
+void
+IndexManagerTest::assert_urgent(const vespalib::string& label, bool pending, bool flush, bool fusion)
+{
+ SCOPED_TRACE(label);
+ EXPECT_EQ(pending, has_pending_urgent_flush());
+ EXPECT_EQ(flush, has_urgent_memory_index_flush());
+ EXPECT_EQ(fusion, has_urgent_fusion());
+}
+
TEST_F(IndexManagerTest, require_that_empty_memory_index_is_not_flushed)
{
auto sources = get_source_collection();
@@ -295,10 +344,11 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed)
IndexFlushTarget target(_index_manager->getMaintainer());
EXPECT_EQ(vespalib::system_time(), target.getLastFlushTime());
vespalib::Executor::Task::UP flushTask;
- runAsMaster([&]() { flushTask = target.initFlush(1, std::make_shared<search::FlushToken>()); });
+ runAsMaster([&]() { flushTask = target.initFlush(2, std::make_shared<search::FlushToken>()); });
flushTask->run();
EXPECT_TRUE(FastOS_File::Stat("test_data/index.flush.1", &stat));
EXPECT_EQ(stat._modifiedTime, target.getLastFlushTime());
+ EXPECT_EQ(2u, target.getFlushedSerialNum());
sources = get_source_collection();
EXPECT_EQ(2u, sources->getSourceCount());
@@ -317,14 +367,15 @@ TEST_F(IndexManagerTest, require_that_memory_index_is_flushed)
resetIndexManager();
IndexFlushTarget target(_index_manager->getMaintainer());
EXPECT_EQ(stat._modifiedTime, target.getLastFlushTime());
+ EXPECT_EQ(2u, target.getFlushedSerialNum());
// updated serial number & flush time when nothing to flush
std::this_thread::sleep_for(2s);
std::chrono::seconds now = duration_cast<seconds>(vespalib::system_clock::now().time_since_epoch());
vespalib::Executor::Task::UP task;
- runAsMaster([&]() { task = target.initFlush(2, std::make_shared<search::FlushToken>()); });
+ runAsMaster([&]() { task = target.initFlush(3, std::make_shared<search::FlushToken>()); });
EXPECT_FALSE(task);
- EXPECT_EQ(2u, target.getFlushedSerialNum());
+ EXPECT_EQ(3u, target.getFlushedSerialNum());
EXPECT_LT(stat._modifiedTime, target.getLastFlushTime());
EXPECT_NEAR(now.count(), duration_cast<seconds>(target.getLastFlushTime().time_since_epoch()).count(), 2);
}
@@ -408,7 +459,7 @@ VESPA_THREAD_STACK_TAG(push_executor)
TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated)
{
- Schema schema(getSchema());
+ Schema schema(getSchema(_interleaved_features));
FieldIndexCollection fic(schema, MockFieldLengthInspector());
auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2);
auto pushThreads = SequencedTaskExecutor::create(push_executor, 2);
@@ -773,11 +824,11 @@ TEST_F(IndexManagerTest, require_that_indexes_manager_stats_can_be_generated)
{
assertStats(0, 1, 0, 0);
addDocument(1);
- assertStats(0, 1, 0, 1);
+ assertStats(0, 1, 0, 2);
flushIndexManager();
- assertStats(1, 1, 1, 1);
+ assertStats(1, 1, 2, 2);
addDocument(2);
- assertStats(1, 1, 1, 2);
+ assertStats(1, 1, 2, 3);
}
TEST_F(IndexManagerTest, require_that_compact_lid_space_works)
@@ -879,6 +930,151 @@ TEST_F(IndexManagerTest, fusion_can_be_stopped)
EXPECT_EQ(2u, spec.flush_ids[1]);
}
+struct EnableInterleavedFeaturesParam
+{
+ enum class Restart {
+ NONE,
+ RESTART1,
+ RESTART2
+ };
+ vespalib::string name = "no_restart";
+ Restart restart = Restart::NONE;
+ bool doc = false; // Feed doc after enabling interleaved fatures
+ bool pruned_config = false; // Original config has been pruned
+
+ EnableInterleavedFeaturesParam no_doc_restart1() && {
+ name = "restart1";
+ restart = Restart::RESTART1;
+ return *this;
+ }
+ EnableInterleavedFeaturesParam doc_restart1() && {
+ name = "doc_restart1";
+ restart = Restart::RESTART1;
+ doc = true;
+ return *this;
+ }
+ EnableInterleavedFeaturesParam doc_restart2() && {
+ name = "doc_restart2";
+ restart = Restart::RESTART2;
+ doc = true;
+ return *this;
+ }
+ EnableInterleavedFeaturesParam doc_restart2_pruned_config() && {
+ name = "doc_restart2_pruned_config";
+ restart = Restart::RESTART2;
+ doc = true;
+ pruned_config = true;
+ return *this;
+ }
+};
+
+std::ostream& operator<<(std::ostream& os, const EnableInterleavedFeaturesParam& param)
+{
+ os << param.name;
+ return os;
+}
+
+std::string
+param_as_string(const testing::TestParamInfo<std::tuple<bool, bool, EnableInterleavedFeaturesParam>>& info)
+{
+ std::ostringstream os;
+ auto& param = info.param;
+ os << (std::get<0>(param) ? "disk_" : "");
+ os << (std::get<1>(param) ? "m1_" : "m0_");
+ os << std::get<2>(param);
+ return os.str();
+}
+
+class IndexManagerEnableInterleavedFeaturesTest : public IndexManagerTest,
+ public testing::WithParamInterface<std::tuple<bool, bool, EnableInterleavedFeaturesParam>>
+{
+protected:
+ void enable_interleaved_features(const vespalib::string& label, bool old_config_docs, bool flushed_interleaved_features, std::optional<SerialNum> serial_num = std::nullopt);
+};
+
+void
+IndexManagerEnableInterleavedFeaturesTest::enable_interleaved_features(const vespalib::string& label, bool old_config_docs, bool flushed_interleaved_features, std::optional<SerialNum> serial_num)
+{
+ if (!serial_num.has_value()) {
+ serial_num = ++_serial_num;
+ }
+ set_schema(getSchema(true), serial_num.value());
+ assert_urgent(label, old_config_docs, old_config_docs && !flushed_interleaved_features, old_config_docs && flushed_interleaved_features);
+}
+
+TEST_P(IndexManagerEnableInterleavedFeaturesTest, enable_interleaved_features)
+{
+ using Restart = EnableInterleavedFeaturesParam::Restart;
+ const auto& params = GetParam();
+ // State before enabling interleaved features
+ bool initial_disk_index = std::get<0>(params);
+ bool nonempty_memory_index = std::get<1>(params);
+ // State for after enabling interleaved features
+ const auto& enable_params = std::get<2>(params);
+ bool old_config_docs = false;
+
+ _interleaved_features = false;
+ SerialNum config_serial_num = 1;
+ resetIndexManager(config_serial_num);
+ if (initial_disk_index) {
+ // Feed doc to memory index without interleaved features and flush
+ // memory index to disk
+ addDocument(docid);
+ old_config_docs = true;
+ flushIndexManager();
+ }
+ if (nonempty_memory_index) {
+ // Feed doc to memory index without interleaved features
+ addDocument(docid + 1);
+ old_config_docs = true;
+ }
+ assert_urgent("setup", false, false, false);
+ enable_interleaved_features("enable interleaved features", old_config_docs, false);
+ auto schema_change_serial_num = _serial_num;
+ EXPECT_EQ(2 + (initial_disk_index ? 1 : 0) + (nonempty_memory_index ? 1 : 0), schema_change_serial_num);
+ if (enable_params.restart == Restart::RESTART1) {
+ // Restart after flushing 1st memory index without interleaved features
+ resetIndexManager(config_serial_num);
+ assert_urgent("after restart1", false, false, false);
+ if (nonempty_memory_index) {
+ addDocument(docid + 1);
+ }
+ EXPECT_EQ(schema_change_serial_num, _serial_num + 1);
+ enable_interleaved_features("replay enable interleaved features after restart1", old_config_docs, false);
+ }
+ if (enable_params.doc) {
+ // Feed second doc to memory index with interleaved features
+ addDocument(docid + 2);
+ }
+ SerialNum disk2_serial_num = schema_change_serial_num + (enable_params.doc ? 1 : 0);
+ EXPECT_EQ(disk2_serial_num, _serial_num);
+ flushIndexManager();
+ assert_urgent("after 2nd flush", old_config_docs, false, old_config_docs);
+ if (enable_params.pruned_config) {
+ // Original config has been pruned
+ _interleaved_features = true;
+ config_serial_num = schema_change_serial_num;
+ }
+ if (enable_params.restart == Restart::RESTART2) {
+ // Restart after flushing 2nd memory index with interleaved fatures
+ resetIndexManager(config_serial_num);
+ assert_urgent("after restart2", old_config_docs, false, old_config_docs);
+ EXPECT_EQ(disk2_serial_num, _serial_num);
+ enable_interleaved_features("replay enable interleaved features after restart2", old_config_docs, true, schema_change_serial_num);
+ }
+ run_fusion();
+ assert_urgent("after fusion", false, false, false);
+}
+
+auto test_values = testing::Combine(testing::Bool(), testing::Bool(),
+ testing::Values(EnableInterleavedFeaturesParam(),
+ EnableInterleavedFeaturesParam().no_doc_restart1(),
+ EnableInterleavedFeaturesParam().doc_restart1(),
+ EnableInterleavedFeaturesParam().doc_restart2(),
+ EnableInterleavedFeaturesParam().doc_restart2_pruned_config()));
+
+INSTANTIATE_TEST_SUITE_P(MultiIndexManagerEnableInterleavedFeaturesTest, IndexManagerEnableInterleavedFeaturesTest, test_values, param_as_string);
+
} // namespace
int
diff --git a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h
index b0243763824..c1ad4bb316b 100644
--- a/searchcore/src/vespa/searchcore/proton/index/indexmanager.h
+++ b/searchcore/src/vespa/searchcore/proton/index/indexmanager.h
@@ -129,6 +129,9 @@ public:
void setMaxFlushed(uint32_t maxFlushed) override {
_maintainer.setMaxFlushed(maxFlushed);
}
+ bool has_pending_urgent_flush() const override {
+ return _maintainer.has_pending_urgent_flush();
+ }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h b/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h
index 12a732ca081..a9f386bfc12 100644
--- a/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h
+++ b/searchcore/src/vespa/searchcore/proton/test/mock_index_manager.h
@@ -29,6 +29,7 @@ struct MockIndexManager : public searchcorespi::IIndexManager
void heartBeat(SerialNum) override {}
void compactLidSpace(uint32_t, SerialNum) override {}
void setMaxFlushed(uint32_t) override { }
+ bool has_pending_urgent_flush() const override { return false; }
};
}
diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmanager.h b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h
index 44e1ad306db..b6bbe416562 100644
--- a/searchcore/src/vespa/searchcorespi/index/iindexmanager.h
+++ b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h
@@ -200,6 +200,15 @@ public:
* @param maxFlushed The max number of flushed indexes before fusion is urgent.
*/
virtual void setMaxFlushed(uint32_t maxFlushed) = 0;
+
+ /**
+ * Checks if we have a pending urgent flush due to a recent
+ * schema change (e.g. regeneration of interleaved features in
+ * disk indexes).
+ *
+ * @return whether an urgent flush is pending
+ */
+ virtual bool has_pending_urgent_flush() const = 0;
};
} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp
index cfb4c3e609f..051175be9af 100644
--- a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp
+++ b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp
@@ -76,6 +76,7 @@ IndexManagerExplorer::get_state(const Inserter &inserter, bool full) const
object.setLong("lastSerialNum", _mgr->getCurrentSerialNum());
if (full) {
IndexManagerStats stats(*_mgr);
+ object.setBool("pending_urgent_flush", _mgr->has_pending_urgent_flush());
Cursor &diskIndexArrayCursor = object.setArray("diskIndexes");
for (const auto &diskIndex : stats.getDiskIndexes()) {
insertDiskIndex(diskIndexArrayCursor, diskIndex);
diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp
index 9e290c7e525..4e8201f75fd 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp
+++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp
@@ -44,7 +44,8 @@ IndexFlushTarget::needUrgentFlush() const
// Due to limitation of 16G address space of single datastore
// TODO: Even better if urgency was decided by memory index itself.
bool urgent = (_numFrozenMemoryIndexes > _maxFrozenMemoryIndexes) ||
- (getApproxMemoryGain().gain() > ssize_t(16_Gi));
+ (getApproxMemoryGain().gain() > ssize_t(16_Gi)) ||
+ _indexMaintainer.urgent_memory_index_flush();
SerialNum flushedSerial = _indexMaintainer.getFlushedSerialNum();
LOG(debug, "Num frozen: %u Memory gain: %" PRId64 " Urgent: %d, flushedSerial=%" PRIu64,
_numFrozenMemoryIndexes, getApproxMemoryGain().gain(), static_cast<int>(urgent), flushedSerial);
diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp
index 175f8dbd800..f889fad136f 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp
+++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp
@@ -69,7 +69,8 @@ IndexFusionTarget::getApproxDiskGain() const
bool
IndexFusionTarget::needUrgentFlush() const
{
- bool urgent = (_fusionStats.numUnfused > _fusionStats.maxFlushed) && (_fusionStats._canRunFusion);
+ bool urgent = (_fusionStats.numUnfused > _fusionStats.maxFlushed || _indexMaintainer.urgent_disk_index_fusion()) &&
+ (_fusionStats._canRunFusion);
LOG(debug, "Num flushed: %d Urgent: %d", _fusionStats.numUnfused, urgent);
return urgent;
}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
index 8270725f8e3..50bbbaec355 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -768,26 +768,8 @@ IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current)
}
}
-namespace {
-
-bool
-has_matching_interleaved_features(const Schema& old_schema, const Schema& new_schema)
-{
- for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) {
- if (itr.hasMatchingOldFields(old_schema) &&
- !itr.has_matching_use_interleaved_features(old_schema))
- {
- return false;
- }
- }
- return true;
-}
-
-}
-
-
void
-IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex)
+IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex, SerialNum serial_num)
{
assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor
LockGuard state_lock(_state_lock);
@@ -818,12 +800,12 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex
_frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId);
}
_current_index = newIndex;
- // Non-matching interleaved features in schemas means that we need to
- // reconstruct or drop interleaved features in posting lists.
- // If so, we must flush the new index to disk even if it is empty.
- // This ensures that 2x triggerFlush will run fusion
- // to reconstruct or drop interleaved features in the posting lists.
- _flush_empty_current_index = !has_matching_interleaved_features(args._oldSchema, args._newSchema);
+ if (serial_num > flush_serial_num() && get_absolute_id() > 1) {
+ consider_urgent_flush(args._oldSchema, args._newSchema, get_absolute_id());
+ }
+ // If schema changes triggered a need for urgent flush then we must
+ // be able to flush the new index to disk even if it is empty.
+ _flush_empty_current_index = (_urgent_flush_id == get_absolute_id());
}
if (dropEmptyLast) {
replaceSource(_current_index_id, _current_index);
@@ -887,6 +869,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
_last_fusion_id(),
_next_id(),
_current_index_id(),
+ _urgent_flush_id(),
_current_index(),
_flush_empty_current_index(false),
_current_serial_num(0),
@@ -950,6 +933,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
pruneRemovedFields(_schema, config.getSerialNum());
}));
_ctx.getThreadingService().master().sync();
+ consider_initial_urgent_flush();
}
IndexMaintainer::~IndexMaintainer()
@@ -1320,7 +1304,7 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum)
// Ensure that all index thread tasks accessing memory index have completed.
commit_and_wait();
// Everything should be quiet now.
- doneSetSchema(args, new_index);
+ doneSetSchema(args, new_index, serialNum);
// Source collection has now changed, caller must reconfigure further
// as appropriate.
}
@@ -1358,4 +1342,79 @@ IndexMaintainer::setMaxFlushed(uint32_t maxFlushed)
_maxFlushed = maxFlushed;
}
+void
+IndexMaintainer::consider_urgent_flush(const Schema& old_schema, const Schema& new_schema, uint32_t flush_id)
+{
+ // Non-matching interleaved features in schemas means that we need to
+ // reconstruct or drop interleaved features in posting lists. Schedule
+ // urgent flush until all indexes are in sync.
+ for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) {
+ if (itr.hasMatchingOldFields(old_schema) &&
+ !itr.has_matching_use_interleaved_features(old_schema))
+ {
+ _urgent_flush_id = flush_id;
+ break;
+ }
+ }
+}
+
+void
+IndexMaintainer::consider_initial_urgent_flush()
+{
+ const Schema *prev_schema = nullptr;
+ std::optional<uint32_t> urgent_source_id;
+ auto coll = getSourceCollection();
+ uint32_t count = coll->getSourceCount();
+ for (uint32_t i = 0; i < count; ++i) {
+ IndexSearchable &is = coll->getSearchable(i);
+ const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is);
+ if (d != nullptr) {
+ auto schema = &d->getSchema();
+ if (prev_schema != nullptr) {
+ consider_urgent_flush(*prev_schema, *schema, _last_fusion_id + coll->getSourceId(i));
+ }
+ prev_schema = schema;
+ }
+ }
+}
+
+uint32_t
+IndexMaintainer::get_urgent_flush_id() const
+{
+ LockGuard lock(_index_update_lock);
+ return _urgent_flush_id;
+}
+
+bool
+IndexMaintainer::urgent_memory_index_flush() const
+{
+ LockGuard lock(_index_update_lock);
+ for (auto& frozen : _frozenMemoryIndexes) {
+ if (frozen._absoluteId == _urgent_flush_id) {
+ return true;
+ }
+ }
+ if (get_absolute_id() == _urgent_flush_id) {
+ return true;
+ }
+ return false;
+}
+
+bool
+IndexMaintainer::urgent_disk_index_fusion() const
+{
+ uint32_t urgent_flush_id = get_urgent_flush_id();
+ LockGuard lock(_fusion_lock);
+ auto& flush_ids = _fusion_spec.flush_ids;
+ return std::find(flush_ids.begin(), flush_ids.end(), urgent_flush_id) != std::end(flush_ids);
+}
+
+bool
+IndexMaintainer::has_pending_urgent_flush() const
+{
+ uint32_t urgent_flush_id = get_urgent_flush_id();
+ LockGuard lock(_fusion_lock);
+ return urgent_flush_id > _fusion_spec.last_fusion_id;
+}
+
}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h
index 9c0d7c7373e..6db588d83ac 100644
--- a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -86,6 +86,7 @@ class IndexMaintainer : public IIndexManager,
uint32_t _last_fusion_id; // Protected by SL + IUL
uint32_t _next_id; // Protected by SL + IUL
uint32_t _current_index_id; // Protected by SL + IUL
+ uint32_t _urgent_flush_id; // Protected by SL + IUL
std::shared_ptr<IMemoryIndex> _current_index; // Protected by SL + IUL
bool _flush_empty_current_index;
std::atomic<SerialNum> _current_serial_num;// Writes protected by IUL
@@ -243,7 +244,7 @@ class IndexMaintainer : public IIndexManager,
~SetSchemaArgs();
};
- void doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex);
+ void doneSetSchema(SetSchemaArgs &args, std::shared_ptr<IMemoryIndex>& newIndex, SerialNum serial_num);
Schema getSchema(void) const;
std::shared_ptr<Schema> getActiveFusionPrunedSchema() const;
@@ -368,6 +369,12 @@ public:
IFlushTarget::List getFlushTargets() override;
void setSchema(const Schema & schema, SerialNum serialNum) override ;
void setMaxFlushed(uint32_t maxFlushed) override;
+ void consider_urgent_flush(const Schema& old_schema, const Schema& new_schema, uint32_t flush_id);
+ void consider_initial_urgent_flush();
+ uint32_t get_urgent_flush_id() const;
+ bool urgent_memory_index_flush() const;
+ bool urgent_disk_index_fusion() const;
+ bool has_pending_urgent_flush() const override;
};
}