summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-05-15 11:50:08 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-05-15 11:50:46 +0000
commit81fb601117642a0e531630cc504cb4a6855d27df (patch)
tree093d429e9e791df3870aa84a3601489c2f214351 /searchcore
parentfb35d47f9dfdc0cab773b86a496e010877d72afe (diff)
Collapse searchcorespi into searchcore
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/CMakeLists.txt6
-rw-r--r--searchcore/src/tests/index/disk_indexes/CMakeLists.txt9
-rw-r--r--searchcore/src/tests/index/disk_indexes/disk_indexes_test.cpp196
-rw-r--r--searchcore/src/tests/index/index_disk_layout/CMakeLists.txt9
-rw-r--r--searchcore/src/tests/index/index_disk_layout/index_disk_layout_test.cpp60
-rw-r--r--searchcore/src/tests/proton/flushengine/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt2
-rw-r--r--searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt2
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h11
-rw-r--r--searchcore/src/vespa/searchcorespi/.gitignore3
-rw-r--r--searchcore/src/vespa/searchcorespi/CMakeLists.txt7
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/.gitignore2
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/CMakeLists.txt6
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/flushstats.cpp13
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/flushstats.h28
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/flushtask.h18
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/iflushtarget.h190
-rw-r--r--searchcore/src/vespa/searchcorespi/flush/lambdaflushtask.h31
-rw-r--r--searchcore/src/vespa/searchcorespi/index/.gitignore2
-rw-r--r--searchcore/src/vespa/searchcorespi/index/CMakeLists.txt28
-rw-r--r--searchcore/src/vespa/searchcorespi/index/disk_index_stats.cpp25
-rw-r--r--searchcore/src/vespa/searchcorespi/index/disk_index_stats.h26
-rw-r--r--searchcore/src/vespa/searchcorespi/index/disk_indexes.cpp131
-rw-r--r--searchcore/src/vespa/searchcorespi/index/disk_indexes.h46
-rw-r--r--searchcore/src/vespa/searchcorespi/index/diskindexcleaner.cpp125
-rw-r--r--searchcore/src/vespa/searchcorespi/index/diskindexcleaner.h26
-rw-r--r--searchcore/src/vespa/searchcorespi/index/eventlogger.cpp69
-rw-r--r--searchcore/src/vespa/searchcorespi/index/eventlogger.h25
-rw-r--r--searchcore/src/vespa/searchcorespi/index/fakeindexsearchable.h50
-rw-r--r--searchcore/src/vespa/searchcorespi/index/fusionrunner.cpp133
-rw-r--r--searchcore/src/vespa/searchcorespi/index/fusionrunner.h65
-rw-r--r--searchcore/src/vespa/searchcorespi/index/fusionspec.h22
-rw-r--r--searchcore/src/vespa/searchcorespi/index/i_thread_service.h34
-rw-r--r--searchcore/src/vespa/searchcorespi/index/idiskindex.h31
-rw-r--r--searchcore/src/vespa/searchcorespi/index/iindexcollection.cpp46
-rw-r--r--searchcore/src/vespa/searchcorespi/index/iindexcollection.h49
-rw-r--r--searchcore/src/vespa/searchcorespi/index/iindexmaintaineroperations.h63
-rw-r--r--searchcore/src/vespa/searchcorespi/index/iindexmanager.cpp8
-rw-r--r--searchcore/src/vespa/searchcorespi/index/iindexmanager.h206
-rw-r--r--searchcore/src/vespa/searchcorespi/index/imemoryindex.h85
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_disk_dir.h36
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.cpp15
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.h30
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp74
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_manager_explorer.h25
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_manager_stats.cpp58
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_manager_stats.h31
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_searchable_stats.cpp26
-rw-r--r--searchcore/src/vespa/searchcorespi/index/index_searchable_stats.h29
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexcollection.cpp253
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexcollection.h69
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexdisklayout.cpp77
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexdisklayout.h38
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp83
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexflushtarget.h38
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp102
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h34
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp1363
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainer.h376
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp27
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.h64
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.cpp22
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.h55
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.cpp19
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.h42
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexreadutilities.cpp88
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexreadutilities.h23
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexsearchable.h59
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexsearchablevisitor.h26
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexwriteutilities.cpp194
-rw-r--r--searchcore/src/vespa/searchcorespi/index/indexwriteutilities.h44
-rw-r--r--searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.cpp32
-rw-r--r--searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.h34
-rw-r--r--searchcore/src/vespa/searchcorespi/index/ithreadingservice.h88
-rw-r--r--searchcore/src/vespa/searchcorespi/index/memory_index_stats.h10
-rw-r--r--searchcore/src/vespa/searchcorespi/index/warmupconfig.h22
-rw-r--r--searchcore/src/vespa/searchcorespi/index/warmupindexcollection.cpp269
-rw-r--r--searchcore/src/vespa/searchcorespi/index/warmupindexcollection.h128
87 files changed, 5905 insertions, 10 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index 7fb98cb2514..8df693e8ea2 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -15,7 +15,6 @@ vespa_define_module(
messagebus
documentapi
persistence
- searchcorespi
searchsummary
fileacquirer
@@ -41,6 +40,9 @@ vespa_define_module(
src/vespa/searchcore/proton/server
src/vespa/searchcore/proton/summaryengine
src/vespa/searchcore/proton/test
+ src/vespa/searchcorespi
+ src/vespa/searchcorespi/flush
+ src/vespa/searchcorespi/index
APPS
src/apps/proton
@@ -158,6 +160,8 @@ vespa_define_module(
src/tests/proton/statusreport
src/tests/proton/summaryengine
src/tests/proton/verify_ranksetup
+ src/tests/index/disk_indexes
+ src/tests/index/index_disk_layout
TEST_DEPENDS
messagebus_messagebus-test
diff --git a/searchcore/src/tests/index/disk_indexes/CMakeLists.txt b/searchcore/src/tests/index/disk_indexes/CMakeLists.txt
new file mode 100644
index 00000000000..81b6bb0e8a9
--- /dev/null
+++ b/searchcore/src/tests/index/disk_indexes/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcorespi_disk_indexes_test_app
+ SOURCES
+ disk_indexes_test.cpp
+ DEPENDS
+ searchcorespi
+ GTest::GTest
+)
+vespa_add_test(NAME searchcorespi_disk_indexes_test_app COMMAND searchcorespi_disk_indexes_test_app)
diff --git a/searchcore/src/tests/index/disk_indexes/disk_indexes_test.cpp b/searchcore/src/tests/index/disk_indexes/disk_indexes_test.cpp
new file mode 100644
index 00000000000..d22ad499316
--- /dev/null
+++ b/searchcore/src/tests/index/disk_indexes/disk_indexes_test.cpp
@@ -0,0 +1,196 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/searchcorespi/index/disk_indexes.h>
+#include <vespa/searchcorespi/index/index_disk_dir.h>
+#include <vespa/searchcorespi/index/indexdisklayout.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <fstream>
+
+namespace {
+
+vespalib::string base_dir("base");
+
+constexpr uint32_t block_size = 4_Ki;
+
+}
+
+namespace searchcorespi::index {
+
+class DiskIndexesTest : public ::testing::Test,
+ public DiskIndexes
+{
+ IndexDiskLayout _layout;
+protected:
+ DiskIndexesTest();
+ ~DiskIndexesTest();
+
+ static IndexDiskDir get_index_disk_dir(const vespalib::string& dir) {
+ return IndexDiskLayout::get_index_disk_dir(dir);
+ }
+
+ void assert_transient_size(uint64_t exp, IndexDiskDir index_disk_dir) {
+ EXPECT_EQ(exp, get_transient_size(_layout, index_disk_dir));
+ }
+};
+
+DiskIndexesTest::DiskIndexesTest()
+ : ::testing::Test(),
+ DiskIndexes(),
+ _layout(base_dir)
+{
+}
+
+DiskIndexesTest::~DiskIndexesTest() = default;
+
+TEST_F(DiskIndexesTest, simple_set_active_works)
+{
+ EXPECT_FALSE(isActive("index.flush.1"));
+ setActive("index.flush.1", 0);
+ EXPECT_TRUE(isActive("index.flush.1"));
+ notActive("index.flush.1");
+ EXPECT_FALSE(isActive("index.flush.1"));
+}
+
+TEST_F(DiskIndexesTest, nested_set_active_works)
+{
+ setActive("index.flush.1", 0);
+ setActive("index.flush.1", 0);
+ EXPECT_TRUE(isActive("index.flush.1"));
+ notActive("index.flush.1");
+ EXPECT_TRUE(isActive("index.flush.1"));
+ notActive("index.flush.1");
+ EXPECT_FALSE(isActive("index.flush.1"));
+}
+
+TEST_F(DiskIndexesTest, is_active_returns_false_for_bad_name)
+{
+ EXPECT_FALSE(isActive("foo/bar/baz"));
+ EXPECT_FALSE(isActive("index.flush.0"));
+}
+
+TEST_F(DiskIndexesTest, remove_works)
+{
+ EXPECT_TRUE(remove(IndexDiskDir()));
+ auto fusion1 = get_index_disk_dir("index.fusion.1");
+ EXPECT_TRUE(remove(fusion1));
+ add_not_active(fusion1);
+ EXPECT_TRUE(remove(fusion1));
+ setActive("index.fusion.1", 0);
+ EXPECT_FALSE(remove(fusion1));
+ notActive("index.fusion.1");
+ EXPECT_TRUE(remove(fusion1));
+}
+
+TEST_F(DiskIndexesTest, basic_get_transient_size_works)
+{
+ /*
+ * When starting to use a new fusion index, we have a transient
+ * period with two ISearchableIndexCollection instances:
+ * - old, containing index.fusion.1 and index.flush.2
+ * - new, containing index.fusion.2
+ */
+ setActive("index.fusion.1", 1000000);
+ setActive("index.flush.2", 500000);
+ setActive("index.fusion.2", 1200000);
+ auto fusion1 = get_index_disk_dir("index.fusion.1");
+ auto flush2 = get_index_disk_dir("index.flush.2");
+ auto fusion2 = get_index_disk_dir("index.fusion.2");
+ {
+ /*
+ * When using the old index collection, disk space used by
+ * index.fusion.2 is considered transient.
+ */
+ SCOPED_TRACE("index.fusion.1");
+ assert_transient_size(1200000, fusion1);
+ }
+ {
+ SCOPED_TRACE("index.flush.2");
+ assert_transient_size(0, flush2);
+ }
+ {
+ /*
+ * When using the new index collection, disk space used by
+ * index.fusion.1 and index.flush.2 is considered transient.
+ */
+ SCOPED_TRACE("index.fusion.2");
+ assert_transient_size(1500000, fusion2);
+ }
+ notActive("index.fusion.1");
+ notActive("index.flush.2");
+ {
+ /*
+ * old index collection removed.
+ */
+ SCOPED_TRACE("index.fusion.2 after remove of index.fusion.1 and index.flush.1");
+ assert_transient_size(0, fusion2);
+ }
+}
+
+TEST_F(DiskIndexesTest, get_transient_size_during_ongoing_fusion)
+{
+ /*
+ * During ongoing fusion, we have one ISearchableIndexCollection instance:
+ * - old, containing index.fusion.1 and index.flush.2
+ *
+ * Fusion output directory is index.fusion.2
+ */
+ setActive("index.fusion.1", 1000000);
+ setActive("index.flush.2", 500000);
+ auto fusion1 = get_index_disk_dir("index.fusion.1");
+ auto fusion2 = get_index_disk_dir("index.fusion.2");
+ add_not_active(fusion2); // start tracking disk space for fusion output
+ {
+ /*
+ * Fusion not yet started.
+ */
+ SCOPED_TRACE("dir missing");
+ assert_transient_size(0, fusion1);
+ }
+ auto dir = base_dir + "/index.fusion.2";
+ vespalib::mkdir(dir, true);
+ {
+ /*
+ * Fusion started, but no files written yet.
+ */
+ SCOPED_TRACE("empty dir");
+ assert_transient_size(0, fusion1);
+ }
+ constexpr uint32_t seek_pos = 999999;
+ {
+ std::string name = dir + "/foo";
+ std::ofstream ostr(name, std::ios::binary);
+ ostr.seekp(seek_pos);
+ ostr.write(" ", 1);
+ ostr.flush();
+ ostr.close();
+ }
+ {
+ /*
+ * Fusion started, one file written.
+ */
+ SCOPED_TRACE("single file");
+ assert_transient_size((seek_pos + block_size) / block_size * block_size, fusion1);
+ }
+ EXPECT_TRUE(remove(fusion2)); // stop tracking disk space for fusion output
+ {
+ /*
+ * Fusion aborted.
+ */
+ SCOPED_TRACE("removed");
+ assert_transient_size(0, fusion1);
+ }
+}
+
+}
+
+int
+main(int argc, char* argv[])
+{
+ vespalib::rmdir(base_dir, true);
+ ::testing::InitGoogleTest(&argc, argv);
+ auto result = RUN_ALL_TESTS();
+ vespalib::rmdir(base_dir, true);
+ return result;
+}
diff --git a/searchcore/src/tests/index/index_disk_layout/CMakeLists.txt b/searchcore/src/tests/index/index_disk_layout/CMakeLists.txt
new file mode 100644
index 00000000000..4e82cf1b9d2
--- /dev/null
+++ b/searchcore/src/tests/index/index_disk_layout/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcorespi_index_disk_layout_test_app
+ SOURCES
+ index_disk_layout_test.cpp
+ DEPENDS
+ searchcorespi
+ GTest::GTest
+)
+vespa_add_test(NAME searchcorespi_index_disk_layout_test_app COMMAND searchcorespi_index_disk_layout_test_app)
diff --git a/searchcore/src/tests/index/index_disk_layout/index_disk_layout_test.cpp b/searchcore/src/tests/index/index_disk_layout/index_disk_layout_test.cpp
new file mode 100644
index 00000000000..e35225b2745
--- /dev/null
+++ b/searchcore/src/tests/index/index_disk_layout/index_disk_layout_test.cpp
@@ -0,0 +1,60 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/searchcorespi/index/indexdisklayout.h>
+#include <vespa/searchcorespi/index/index_disk_dir.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+namespace searchcorespi::index {
+
+namespace {
+
+void expect_index_disk_dir(IndexDiskDir exp, const vespalib::string& dir)
+{
+ auto act = IndexDiskLayout::get_index_disk_dir(dir);
+ ASSERT_TRUE(act.valid());
+ ASSERT_EQ(exp, act);
+}
+
+void expect_bad_index_disk_dir(const vespalib::string& dir)
+{
+ auto act = IndexDiskLayout::get_index_disk_dir(dir);
+ ASSERT_FALSE(act.valid());
+}
+
+}
+
+TEST(IndexDiskLayoutTest, get_index_disk_dir_works)
+{
+ {
+ SCOPED_TRACE("index.fusion.1");
+ expect_index_disk_dir(IndexDiskDir(1, true), "index.fusion.1");
+ }
+ {
+ SCOPED_TRACE("index.flush.2");
+ expect_index_disk_dir(IndexDiskDir(2, false), "index.flush.2");
+ }
+ {
+ SCOPED_TRACE("index.flush.3");
+ expect_index_disk_dir(IndexDiskDir(3, false), "index.flush.3");
+ }
+ {
+ SCOPED_TRACE("foo/bar/index.flush.4");
+ expect_index_disk_dir(IndexDiskDir(4, false), "foo/bar/index.flush.4");
+ }
+ {
+ SCOPED_TRACE("index.flush.");
+ expect_bad_index_disk_dir("index.flush.");
+ }
+ {
+ SCOPED_TRACE("index.flush.0");
+ expect_bad_index_disk_dir("index.flush.0");
+ }
+ {
+ SCOPED_TRACE("asdf");
+ expect_bad_index_disk_dir("asdf");
+ }
+}
+
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchcore/src/tests/proton/flushengine/CMakeLists.txt b/searchcore/src/tests/proton/flushengine/CMakeLists.txt
index b45c4fa411c..a47b3f2c93f 100644
--- a/searchcore/src/tests/proton/flushengine/CMakeLists.txt
+++ b/searchcore/src/tests/proton/flushengine/CMakeLists.txt
@@ -5,6 +5,7 @@ vespa_add_executable(searchcore_flushengine_test_app TEST
DEPENDS
searchcore_flushengine
searchcore_pcommon
+ searchcore_test
)
vespa_add_test(
NAME searchcore_flushengine_test_app
diff --git a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt
index 9f0c777a4d7..608f80e38aa 100644
--- a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt
+++ b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt
@@ -3,8 +3,8 @@ vespa_add_executable(searchcore_flushengine_prepare_restart_flush_strategy_test_
SOURCES
prepare_restart_flush_strategy_test.cpp
DEPENDS
- searchcorespi
searchcore_flushengine
+ searchcore_test
)
vespa_add_test(
NAME searchcore_flushengine_prepare_restart_flush_strategy_test_app
diff --git a/searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt b/searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt
index 4a5dd2acb9d..aad11581e2f 100644
--- a/searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt
+++ b/searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt
@@ -4,5 +4,6 @@ vespa_add_executable(searchcore_documentdb_job_trackers_test_app TEST
documentdb_job_trackers_test.cpp
DEPENDS
searchcore_proton_metrics
+ searchcore_test
)
vespa_add_test(NAME searchcore_documentdb_job_trackers_test_app COMMAND searchcore_documentdb_job_trackers_test_app)
diff --git a/searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt b/searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt
index 6ab08602f67..4467bea4ab1 100644
--- a/searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt
+++ b/searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt
@@ -4,5 +4,6 @@ vespa_add_executable(searchcore_job_tracked_flush_test_app TEST
job_tracked_flush_test.cpp
DEPENDS
searchcore_proton_metrics
+ searchcore_test
)
vespa_add_test(NAME searchcore_job_tracked_flush_test_app COMMAND searchcore_job_tracked_flush_test_app)
diff --git a/searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt b/searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt
index 80c5521c763..207883aab0e 100644
--- a/searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt
+++ b/searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt
@@ -5,5 +5,6 @@ vespa_add_executable(searchcore_memoryflush_test_app TEST
DEPENDS
searchcore_server
searchcore_flushengine
+ searchcore_test
)
vespa_add_test(NAME searchcore_memoryflush_test_app COMMAND searchcore_memoryflush_test_app)
diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
index 60d3a05502a..501c5e468dd 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt
@@ -49,6 +49,7 @@ vespa_add_library(searchcore_bmcluster STATIC
searchcore_grouping
searchcore_proton_metrics
searchcore_fconfig
+ searchcorespi
storageserver_storageapp
messagebus_messagebus-test
messagebus
diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt
index 5f288a116b4..ff740f7a4ae 100644
--- a/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt
@@ -20,4 +20,5 @@ vespa_add_library(searchcore_documentmetastore STATIC
searchcore_attribute
searchcore_bucketdb
searchcore_initializer
+ searchcorespi
)
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt
index b506e889a97..a0e69af5b1e 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt
@@ -16,4 +16,5 @@ vespa_add_library(searchcore_flushengine STATIC
tls_stats_factory.cpp
tls_stats_map.cpp
DEPENDS
+ searchcorespi
)
diff --git a/searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt
index 7f5ece11366..ee9f0caded1 100644
--- a/searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt
@@ -7,4 +7,5 @@ vespa_add_library(searchcore_index STATIC
indexmanager.cpp
memoryindexwrapper.cpp
DEPENDS
+ searchcorespi
)
diff --git a/searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt
index 41e2fe2105f..1c203dd1284 100644
--- a/searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt
@@ -43,4 +43,5 @@ vespa_add_library(searchcore_matching STATIC
viewresolver.cpp
DEPENDS
searchcore_grouping
+ searchcorespi
)
diff --git a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt
index 3a2e443bfef..5df9060ea07 100644
--- a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt
@@ -8,6 +8,7 @@ vespa_add_library(searchcore_test STATIC
clusterstatehandler.cpp
documentdb_config_builder.cpp
dummy_feed_view.cpp
+ dummy_flush_target.cpp
mock_index_manager.cpp
mock_shared_threading_service.cpp
userdocumentsbuilder.cpp
@@ -17,4 +18,5 @@ vespa_add_library(searchcore_test STATIC
DEPENDS
searchcore_server
searchcore_fconfig
+ searchcorespi
)
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp
new file mode 100644
index 00000000000..8915e3b367c
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp
@@ -0,0 +1,15 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "dummy_flush_target.h"
+
+namespace proton::test {
+
+DummyFlushTarget::DummyFlushTarget(const vespalib::string &name) noexcept
+ : searchcorespi::IFlushTarget(name)
+{}
+DummyFlushTarget::DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept
+ : searchcorespi::IFlushTarget(name, type, component)
+{}
+DummyFlushTarget::~DummyFlushTarget() = default;
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
index 3689b181c52..a9206233c9d 100644
--- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
+++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h
@@ -7,14 +7,9 @@ namespace proton::test {
struct DummyFlushTarget : public searchcorespi::IFlushTarget
{
- DummyFlushTarget(const vespalib::string &name) noexcept
- : searchcorespi::IFlushTarget(name)
- {}
- DummyFlushTarget(const vespalib::string &name,
- const Type &type,
- const Component &component) noexcept
- : searchcorespi::IFlushTarget(name, type, component)
- {}
+ DummyFlushTarget(const vespalib::string &name) noexcept;
+ DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept;
+ ~DummyFlushTarget() override;
MemoryGain getApproxMemoryGain() const override { return MemoryGain(0, 0); }
DiskGain getApproxDiskGain() const override { return DiskGain(0, 0); }
SerialNum getFlushedSerialNum() const override { return 0; }
diff --git a/searchcore/src/vespa/searchcorespi/.gitignore b/searchcore/src/vespa/searchcorespi/.gitignore
new file mode 100644
index 00000000000..9d6ecd51398
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/.gitignore
@@ -0,0 +1,3 @@
+/.depend
+/Makefile
+/libsearchcorespi.so.5.1
diff --git a/searchcore/src/vespa/searchcorespi/CMakeLists.txt b/searchcore/src/vespa/searchcorespi/CMakeLists.txt
new file mode 100644
index 00000000000..fab1d007a4f
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/CMakeLists.txt
@@ -0,0 +1,7 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(searchcorespi STATIC
+ SOURCES
+ $<TARGET_OBJECTS:searchcorespi_flush>
+ $<TARGET_OBJECTS:searchcorespi_index>
+ DEPENDS
+)
diff --git a/searchcore/src/vespa/searchcorespi/flush/.gitignore b/searchcore/src/vespa/searchcorespi/flush/.gitignore
new file mode 100644
index 00000000000..7e7c0fe7fae
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/flush/.gitignore
@@ -0,0 +1,2 @@
+/.depend
+/Makefile
diff --git a/searchcore/src/vespa/searchcorespi/flush/CMakeLists.txt b/searchcore/src/vespa/searchcorespi/flush/CMakeLists.txt
new file mode 100644
index 00000000000..b2777d4327a
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/flush/CMakeLists.txt
@@ -0,0 +1,6 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(searchcorespi_flush STATIC OBJECT
+ SOURCES
+ flushstats.cpp
+ DEPENDS
+)
diff --git a/searchcore/src/vespa/searchcorespi/flush/flushstats.cpp b/searchcore/src/vespa/searchcorespi/flush/flushstats.cpp
new file mode 100644
index 00000000000..28632219a28
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/flush/flushstats.cpp
@@ -0,0 +1,13 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "flushstats.h"
+
+namespace searchcorespi {
+
+FlushStats::FlushStats() :
+ _path(),
+ _pathElementsToLog(6)
+{
+}
+
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/flush/flushstats.h b/searchcore/src/vespa/searchcorespi/flush/flushstats.h
new file mode 100644
index 00000000000..f92187b2112
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/flush/flushstats.h
@@ -0,0 +1,28 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+
+/**
+ * Class with stats for what have been flushed.
+ */
+class FlushStats
+{
+private:
+ vespalib::string _path; // path to data flushed
+ size_t _pathElementsToLog;
+
+public:
+ FlushStats();
+
+ void setPath(const vespalib::string & path) { _path = path; }
+ void setPathElementsToLog(size_t numElems) { _pathElementsToLog = numElems; }
+
+ const vespalib::string & getPath() const { return _path; }
+ size_t getPathElementsToLog() const { return _pathElementsToLog; }
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/flush/flushtask.h b/searchcore/src/vespa/searchcorespi/flush/flushtask.h
new file mode 100644
index 00000000000..a4de3f65fff
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/flush/flushtask.h
@@ -0,0 +1,18 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/searchlib/common/serialnum.h>
+
+namespace searchcorespi {
+
+class FlushTask : public vespalib::Executor::Task
+{
+public:
+ typedef std::unique_ptr<FlushTask> UP;
+
+ virtual search::SerialNum getFlushSerial() const = 0;
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h
new file mode 100644
index 00000000000..dff6041d7d5
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h
@@ -0,0 +1,190 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "flushstats.h"
+#include "flushtask.h"
+#include <vespa/vespalib/util/time.h>
+#include <vector>
+
+namespace search { class IFlushToken; }
+
+namespace searchcorespi {
+
+/**
+ * This abstract class represents a flushable object that uses
+ * getApproxBytesBeforeFlush() bytes of memory, that will be reduced to
+ * getApproxBytesAfterFlush() if flushed.
+ */
+class IFlushTarget
+{
+public:
+ /**
+ * The flush types that a flush target can represent.
+ */
+ enum class Type {
+ FLUSH,
+ SYNC,
+ GC,
+ OTHER
+ };
+
+ /**
+ * The component types that a flush target can be used for.
+ */
+ enum class Component {
+ ATTRIBUTE,
+ INDEX,
+ DOCUMENT_STORE,
+ OTHER
+ };
+
+private:
+ vespalib::string _name;
+ Type _type;
+ Component _component;
+
+public:
+ template<typename T>
+ class Gain {
+ public:
+ Gain() noexcept : _before(0), _after(0) { }
+ Gain(T before, T after) noexcept : _before(before), _after(after) { }
+ T getBefore() const { return _before; }
+ T getAfter() const { return _after; }
+ T gain() const { return _before - _after; }
+ double gainRate() const { return (_before != 0) ? double(gain())/_before : 0;}
+ Gain & operator += (const Gain & b) { _before += b.getBefore(); _after += b.getAfter(); return *this; }
+ static Gain noGain(size_t currentSize) { return Gain(currentSize, currentSize); }
+ private:
+ T _before;
+ T _after;
+ };
+ using MemoryGain = Gain<int64_t>;
+ using DiskGain = Gain<int64_t>;
+ using SerialNum = search::SerialNum;
+ using Time = vespalib::system_time;
+
+ /**
+ * Convenience typedefs.
+ */
+ typedef std::shared_ptr<IFlushTarget> SP;
+ typedef std::vector<SP> List;
+ typedef FlushTask Task;
+
+ /**
+ * Constructs a new instance of this class.
+ *
+ * @param name The handler-wide unique name of this target.
+ */
+ IFlushTarget(const vespalib::string &name) noexcept
+ : _name(name),
+ _type(Type::OTHER),
+ _component(Component::OTHER)
+ { }
+
+ /**
+ * Constructs a new instance of this class.
+ *
+ * @param name The handler-wide unique name of this target.
+ * @param type The flush type of this target.
+ * @param component The component type of this target.
+ */
+ IFlushTarget(const vespalib::string &name,
+ const Type &type,
+ const Component &component) noexcept
+ : _name(name),
+ _type(type),
+ _component(component)
+ { }
+
+ /**
+ * Virtual destructor required for inheritance.
+ */
+ virtual ~IFlushTarget() = default;
+
+ /**
+ * Returns the handler-wide unique name of this target.
+ *
+ * @return The name of this.
+ */
+ const vespalib::string & getName() const { return _name; }
+
+ /**
+ * Returns the flush type of this target.
+ */
+ Type getType() const { return _type; }
+
+ /**
+ * Returns the component type of this target.
+ */
+ Component getComponent() const { return _component; }
+
+ /**
+ * Returns the approximate memory gain of this target, in bytes.
+ *
+ * @return The gain
+ */
+ virtual MemoryGain getApproxMemoryGain() const = 0;
+
+ /**
+ * Returns the approximate memory gain of this target, in bytes.
+ *
+ * @return The gain
+ */
+ virtual DiskGain getApproxDiskGain() const = 0;
+
+ /**
+ * Returns the approximate amount of bytes this target writes to disk if flushed.
+ */
+ virtual uint64_t getApproxBytesToWriteToDisk() const = 0;
+
+ /**
+ * Return cost of replaying a feed operation relative to cost of reading a feed operation from tls.
+ */
+ virtual double get_replay_operation_cost() const { return 0.0; }
+
+ /**
+ * Returns the last serial number for the transaction applied to
+ * target before it was flushed to disk. The transaction log can
+ * not be pruned beyond this.
+ *
+ * @return The last serial number represented in flushed target
+ */
+ virtual SerialNum getFlushedSerialNum() const = 0;
+
+ /**
+ * Returns the time of last flush.
+ *
+ * @return The last flush time.
+ */
+ virtual Time getLastFlushTime() const = 0;
+
+ /**
+ * Return if the target itself is in bad need for a flush.
+ *
+ * @return true if an urgent flush is needed
+ */
+ virtual bool needUrgentFlush() const { return false; }
+
+ /**
+ * Initiates the flushing of temporary memory. This method must perform
+ * everything required to allow another thread to complete the flush. This
+ * method is called by the flush scheduler thread.
+ *
+ * @param currentSerial The current transaction serial number.
+ * @return The task used to complete the flush.
+ */
+ virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) = 0;
+
+ /**
+ * Returns the stats for the last completed flush operation
+ * for this flush target.
+ *
+ * @return The stats for the last flush.
+ */
+ virtual FlushStats getLastFlushStats() const = 0;
+
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/flush/lambdaflushtask.h b/searchcore/src/vespa/searchcorespi/flush/lambdaflushtask.h
new file mode 100644
index 00000000000..75737ce73d5
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/flush/lambdaflushtask.h
@@ -0,0 +1,31 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "flushtask.h"
+
+namespace searchcorespi {
+
+template <class FunctionType>
+class LambdaFlushTask : public FlushTask {
+ FunctionType _func;
+ search::SerialNum _flushSerial;
+
+public:
+ LambdaFlushTask(FunctionType &&func, search::SerialNum flushSerial)
+ : _func(std::move(func)),
+ _flushSerial(flushSerial)
+ {}
+ ~LambdaFlushTask() override = default;
+ search::SerialNum getFlushSerial() const override { return _flushSerial; }
+ void run() override { _func(); }
+};
+
+template <class FunctionType>
+std::unique_ptr<FlushTask>
+makeLambdaFlushTask(FunctionType &&function, search::SerialNum flushSerial)
+{
+ return std::make_unique<LambdaFlushTask<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function), flushSerial);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/.gitignore b/searchcore/src/vespa/searchcorespi/index/.gitignore
new file mode 100644
index 00000000000..7e7c0fe7fae
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/.gitignore
@@ -0,0 +1,2 @@
+/.depend
+/Makefile
diff --git a/searchcore/src/vespa/searchcorespi/index/CMakeLists.txt b/searchcore/src/vespa/searchcorespi/index/CMakeLists.txt
new file mode 100644
index 00000000000..ca33131d7f4
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/CMakeLists.txt
@@ -0,0 +1,28 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(searchcorespi_index STATIC OBJECT
+ SOURCES
+ diskindexcleaner.cpp
+ disk_indexes.cpp
+ disk_index_stats.cpp
+ eventlogger.cpp
+ fusionrunner.cpp
+ iindexmanager.cpp
+ iindexcollection.cpp
+ index_disk_dir_state.cpp
+ index_manager_explorer.cpp
+ index_manager_stats.cpp
+ indexcollection.cpp
+ indexdisklayout.cpp
+ indexflushtarget.cpp
+ indexfusiontarget.cpp
+ indexmaintainer.cpp
+ indexmaintainerconfig.cpp
+ indexmaintainercontext.cpp
+ indexmanagerconfig.cpp
+ indexreadutilities.cpp
+ index_searchable_stats.cpp
+ indexwriteutilities.cpp
+ warmupindexcollection.cpp
+ isearchableindexcollection.cpp
+ DEPENDS
+)
diff --git a/searchcore/src/vespa/searchcorespi/index/disk_index_stats.cpp b/searchcore/src/vespa/searchcorespi/index/disk_index_stats.cpp
new file mode 100644
index 00000000000..1b77061de8c
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/disk_index_stats.cpp
@@ -0,0 +1,25 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "disk_index_stats.h"
+#include "idiskindex.h"
+
+
+namespace searchcorespi::index {
+
+DiskIndexStats::DiskIndexStats()
+ : IndexSearchableStats(),
+ _indexDir()
+{
+}
+
+DiskIndexStats::DiskIndexStats(const IDiskIndex &index)
+ : IndexSearchableStats(index),
+ _indexDir(index.getIndexDir())
+{
+}
+
+DiskIndexStats::~DiskIndexStats()
+{
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/disk_index_stats.h b/searchcore/src/vespa/searchcorespi/index/disk_index_stats.h
new file mode 100644
index 00000000000..831d95e95c1
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/disk_index_stats.h
@@ -0,0 +1,26 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "index_searchable_stats.h"
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+
+struct IDiskIndex;
+
+/**
+ * Information about a disk index usable by state explorer.
+ */
+class DiskIndexStats : public IndexSearchableStats {
+ vespalib::string _indexDir;
+public:
+ DiskIndexStats();
+ DiskIndexStats(const IDiskIndex &index);
+ ~DiskIndexStats();
+
+ const vespalib::string &getIndexdir() const { return _indexDir; }
+};
+
+} // namespace searchcorespi::index
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/disk_indexes.cpp b/searchcore/src/vespa/searchcorespi/index/disk_indexes.cpp
new file mode 100644
index 00000000000..28f6a886d06
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/disk_indexes.cpp
@@ -0,0 +1,131 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "disk_indexes.h"
+#include "indexdisklayout.h"
+#include "index_disk_dir.h"
+#include "index_disk_dir_state.h"
+#include <vespa/searchlib/util/dirtraverse.h>
+#include <cassert>
+#include <vector>
+
+using vespalib::string;
+
+namespace searchcorespi::index {
+
+DiskIndexes::DiskIndexes() = default;
+DiskIndexes::~DiskIndexes() = default;
+
+void
+DiskIndexes::setActive(const string &index, uint64_t size_on_disk)
+{
+ auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(index);
+ assert(index_disk_dir.valid());
+ std::lock_guard lock(_lock);
+ auto insres = _active.insert(std::make_pair(index_disk_dir, IndexDiskDirState()));
+ insres.first->second.activate();
+ if (!insres.first->second.get_size_on_disk().has_value()) {
+ insres.first->second.set_size_on_disk(size_on_disk);
+ }
+}
+
+void DiskIndexes::notActive(const string & index) {
+ auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(index);
+ assert(index_disk_dir.valid());
+ std::lock_guard lock(_lock);
+ auto it = _active.find(index_disk_dir);
+ assert(it != _active.end());
+ assert(it->second.is_active());
+ it->second.deactivate();
+ if (!it->second.is_active()) {
+ _active.erase(it);
+ }
+}
+
+bool DiskIndexes::isActive(const string &index) const {
+ auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(index);
+ if (!index_disk_dir.valid()) {
+ return false;
+ }
+ std::lock_guard lock(_lock);
+ auto it = _active.find(index_disk_dir);
+ return (it != _active.end()) && it->second.is_active();
+}
+
+
+void
+DiskIndexes::add_not_active(IndexDiskDir index_disk_dir)
+{
+ std::lock_guard lock(_lock);
+ _active.insert(std::make_pair(index_disk_dir, IndexDiskDirState()));
+}
+
+bool
+DiskIndexes::remove(IndexDiskDir index_disk_dir)
+{
+ if (!index_disk_dir.valid()) {
+ return true;
+ }
+ std::lock_guard lock(_lock);
+ auto it = _active.find(index_disk_dir);
+ if (it == _active.end()) {
+ return true;
+ }
+ if (it->second.is_active()) {
+ return false;
+ }
+ _active.erase(it);
+ return true;
+}
+
+uint64_t
+DiskIndexes::get_transient_size(IndexDiskLayout& layout, IndexDiskDir index_disk_dir) const
+{
+ /*
+ * Only report transient size related to a valid fusion index. This ensures
+ * that transient size is reported once per index collection.
+ */
+ if (!index_disk_dir.valid() || !index_disk_dir.is_fusion_index()) {
+ return 0u;
+ }
+ uint64_t transient_size = 0u;
+ std::vector<IndexDiskDir> deferred;
+ {
+ std::lock_guard lock(_lock);
+ for (auto &entry : _active) {
+ if (entry.first < index_disk_dir) {
+ /*
+ * Indexes before current fusion index are on the way out and
+ * will be removed when all older index collections
+ * referencing them are destroyed. Disk space used by these
+ * indexes is considered transient.
+ */
+ if (entry.second.get_size_on_disk().has_value()) {
+ transient_size += entry.second.get_size_on_disk().value();
+ }
+ }
+ if (index_disk_dir < entry.first && entry.first.is_fusion_index()) {
+ /*
+ * Fusion indexes after current fusion index can be partially
+ * complete and might be removed if fusion is aborted. Disk
+ * space used by these indexes is consider transient.
+ */
+ if (entry.second.get_size_on_disk().has_value()) {
+ transient_size += entry.second.get_size_on_disk().value();
+ } else {
+ deferred.emplace_back(entry.first);
+ }
+ }
+ }
+ }
+ for (auto& entry : deferred) {
+ auto index_dir = layout.getFusionDir(entry.get_id());
+ try {
+ search::DirectoryTraverse dirt(index_dir.c_str());
+ transient_size += dirt.GetTreeSize();
+ } catch (std::exception &) {
+ }
+ }
+ return transient_size;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/disk_indexes.h b/searchcore/src/vespa/searchcorespi/index/disk_indexes.h
new file mode 100644
index 00000000000..842c1814faf
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/disk_indexes.h
@@ -0,0 +1,46 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <map>
+#include <mutex>
+#include <memory>
+
+namespace searchcorespi::index {
+
+class IndexDiskDir;
+class IndexDiskDirState;
+class IndexDiskLayout;
+
+/**
+ * Class used to keep track of the set of disk indexes in an index maintainer.
+ * The index directories are used as identifiers.
+ *
+ * DiskIndexCleaner will remove old disk indexes not marked active,
+ * i.e. old disk indexes used by old index collections are not removed.
+ *
+ * At start of fusion, an entry for fusion output index is added, to allow for
+ * tracking of transient disk use while fusion is ongoing. If fusion fails then
+ * the entry is removed, otherwise the entry is marked active as a side effect
+ * of setting up a new index collection.
+ */
+class DiskIndexes {
+ std::map<IndexDiskDir, IndexDiskDirState> _active;
+ mutable std::mutex _lock;
+
+public:
+ using SP = std::shared_ptr<DiskIndexes>;
+ DiskIndexes();
+ ~DiskIndexes();
+ DiskIndexes(const DiskIndexes &) = delete;
+ DiskIndexes & operator = (const DiskIndexes &) = delete;
+ void setActive(const vespalib::string & index, uint64_t size_on_disk);
+ void notActive(const vespalib::string & index);
+ bool isActive(const vespalib::string & index) const;
+ void add_not_active(IndexDiskDir index_disk_dir);
+ bool remove(IndexDiskDir index_disk_dir);
+ uint64_t get_transient_size(IndexDiskLayout& layout, IndexDiskDir index_disk_dir) const;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.cpp b/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.cpp
new file mode 100644
index 00000000000..3bed7ea8ea7
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.cpp
@@ -0,0 +1,125 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "diskindexcleaner.h"
+#include "disk_indexes.h"
+#include "indexdisklayout.h"
+#include "index_disk_dir.h"
+#include <vespa/fastos/file.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <sstream>
+#include <vector>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.diskindexcleaner");
+
+using std::istringstream;
+using vespalib::string;
+using std::vector;
+
+namespace searchcorespi::index {
+
+namespace {
+vector<string> readIndexes(const string &base_dir) {
+ vector<string> indexes;
+ FastOS_DirectoryScan dir_scan(base_dir.c_str());
+ while (dir_scan.ReadNext()) {
+ string name = dir_scan.GetName();
+ if (!dir_scan.IsDirectory() || name.find("index.") != 0) {
+ continue;
+ }
+ indexes.push_back(name);
+ }
+ return indexes;
+}
+
+bool isValidIndex(const string &index_dir) {
+ FastOS_File serial_file((index_dir + "/serial.dat").c_str());
+ return serial_file.OpenReadOnlyExisting();
+}
+
+void invalidateIndex(const string &index_dir) {
+ vespalib::unlink(index_dir + "/serial.dat");
+ vespalib::File::sync(index_dir);
+}
+
+uint32_t findLastFusionId(const string &base_dir,
+ const vector<string> &indexes) {
+ uint32_t fusion_id = 0;
+ const string prefix = "index.fusion.";
+ for (size_t i = 0; i < indexes.size(); ++i) {
+ if (indexes[i].find(prefix) != 0) {
+ continue;
+ }
+ if (!isValidIndex(base_dir + "/" + indexes[i])) {
+ continue;
+ }
+
+ uint32_t new_id = 0;
+ istringstream ist(indexes[i].substr(prefix.size()));
+ ist >> new_id;
+ fusion_id = std::max(fusion_id, new_id);
+ }
+ return fusion_id;
+}
+
+void removeDir(const string &dir) {
+ LOG(debug, "Removing index dir '%s'", dir.c_str());
+ invalidateIndex(dir);
+ vespalib::rmdir(dir, true);
+}
+
+bool isOldIndex(const string &index, uint32_t last_fusion_id) {
+ string::size_type pos = index.rfind(".");
+ istringstream ist(index.substr(pos + 1));
+ uint32_t id = last_fusion_id;
+ ist >> id;
+ if (id < last_fusion_id) {
+ return true;
+ } else if (id == last_fusion_id) {
+ return index.find("flush") != string::npos;
+ }
+ return false;
+}
+
+void removeOld(const string &base_dir, const vector<string> &indexes,
+ DiskIndexes &disk_indexes, bool remove) {
+ uint32_t last_fusion_id = findLastFusionId(base_dir, indexes);
+ for (size_t i = 0; i < indexes.size(); ++i) {
+ const string index_dir = base_dir + "/" + indexes[i];
+ auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(indexes[i]);
+ if (isOldIndex(indexes[i], last_fusion_id) &&
+ disk_indexes.remove(index_disk_dir)) {
+ if (remove) {
+ removeDir(index_dir);
+ } else {
+ invalidateIndex(index_dir);
+ }
+ }
+ }
+}
+
+void removeInvalid(const string &base_dir, const vector<string> &indexes) {
+ for (size_t i = 0; i < indexes.size(); ++i) {
+ const string index_dir = base_dir + "/" + indexes[i];
+ if (!isValidIndex(index_dir)) {
+ LOG(debug, "Found invalid index dir '%s'", index_dir.c_str());
+ removeDir(index_dir);
+ }
+ }
+}
+} // namespace
+
+void DiskIndexCleaner::clean(const string &base_dir,
+ DiskIndexes &disk_indexes) {
+ vector<string> indexes = readIndexes(base_dir);
+ removeOld(base_dir, indexes, disk_indexes, false);
+ removeInvalid(base_dir, indexes);
+}
+
+void DiskIndexCleaner::removeOldIndexes(
+ const string &base_dir, DiskIndexes &disk_indexes) {
+ vector<string> indexes = readIndexes(base_dir);
+ removeOld(base_dir, indexes, disk_indexes, true);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.h b/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.h
new file mode 100644
index 00000000000..cbd3a5aa94f
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.h
@@ -0,0 +1,26 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+class DiskIndexes;
+
+/**
+ * Utility class used to clean and remove index directories.
+ */
+struct DiskIndexCleaner {
+ /**
+ * Deletes all indexes with id lower than the most recent fusion id.
+ */
+ static void clean(const vespalib::string &index_dir,
+ DiskIndexes& disk_indexes);
+ static void removeOldIndexes(const vespalib::string &index_dir,
+ DiskIndexes& disk_indexes);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/eventlogger.cpp b/searchcore/src/vespa/searchcorespi/index/eventlogger.cpp
new file mode 100644
index 00000000000..7a5b1bf907a
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/eventlogger.cpp
@@ -0,0 +1,69 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "eventlogger.h"
+#include <vespa/searchlib/util/logutil.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.eventlogger");
+
+using vespalib::JSONStringer;
+using search::util::LogUtil;
+
+namespace searchcorespi::index {
+
+void
+EventLogger::diskIndexLoadStart(const vespalib::string &indexDir)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("input");
+ LogUtil::logDir(jstr, indexDir, 6);
+ jstr.endObject();
+ EV_STATE("diskindex.load.start", jstr.toString().data());
+}
+
+void
+EventLogger::diskIndexLoadComplete(const vespalib::string &indexDir,
+ int64_t elapsedTimeMs)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs);
+ jstr.appendKey("input");
+ LogUtil::logDir(jstr, indexDir, 6);
+ jstr.endObject();
+ EV_STATE("diskindex.load.complete", jstr.toString().data());
+}
+
+void
+EventLogger::diskFusionStart(const std::vector<vespalib::string> &sources,
+ const vespalib::string &fusionDir)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("inputs");
+ jstr.beginArray();
+ for (size_t i = 0; i < sources.size(); ++i) {
+ LogUtil::logDir(jstr, sources[i], 6);
+ }
+ jstr.endArray();
+ jstr.appendKey("output");
+ LogUtil::logDir(jstr, fusionDir, 6);
+ jstr.endObject();
+ EV_STATE("fusion.start", jstr.toString().data());
+}
+
+void
+EventLogger::diskFusionComplete(const vespalib::string &fusionDir,
+ int64_t elapsedTimeMs)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs);
+ jstr.appendKey("output");
+ LogUtil::logDir(jstr, fusionDir, 6);
+ jstr.endObject();
+ EV_STATE("fusion.complete", jstr.toString().data());
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/eventlogger.h b/searchcore/src/vespa/searchcorespi/index/eventlogger.h
new file mode 100644
index 00000000000..6191543dcb3
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/eventlogger.h
@@ -0,0 +1,25 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <vector>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Class used to log various events related to disk index handling.
+ **/
+struct EventLogger {
+ static void diskIndexLoadStart(const vespalib::string &indexDir);
+ static void diskIndexLoadComplete(const vespalib::string &indexDir,
+ int64_t elapsedTimeMs);
+ static void diskFusionStart(const std::vector<vespalib::string> &sources,
+ const vespalib::string &fusionDir);
+ static void diskFusionComplete(const vespalib::string &fusionDir,
+ int64_t elapsedTimeMs);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/fakeindexsearchable.h b/searchcore/src/vespa/searchcorespi/index/fakeindexsearchable.h
new file mode 100644
index 00000000000..25a8b6847e9
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/fakeindexsearchable.h
@@ -0,0 +1,50 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "indexsearchable.h"
+#include <vespa/searchlib/queryeval/fake_searchable.h>
+
+namespace searchcorespi {
+
+/**
+ * A fake index searchable used for unit testing.
+ */
+class FakeIndexSearchable : public IndexSearchable {
+private:
+ search::queryeval::FakeSearchable _fake;
+
+public:
+ FakeIndexSearchable() : _fake() { }
+
+ search::queryeval::FakeSearchable &getFake() { return _fake; }
+
+ /**
+ * Implements IndexSearchable
+ */
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term) override
+ {
+ return _fake.createBlueprint(requestContext, field, term);
+ }
+
+ search::SearchableStats getSearchableStats() const override {
+ return search::SearchableStats();
+ }
+
+ search::SerialNum getSerialNum() const override { return 0; }
+ void accept(IndexSearchableVisitor &visitor) const override {
+ (void) visitor;
+ }
+
+ search::index::FieldLengthInfo get_field_length_info(const vespalib::string& field_name) const override {
+ (void) field_name;
+ return search::index::FieldLengthInfo();
+ }
+
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcore/src/vespa/searchcorespi/index/fusionrunner.cpp
new file mode 100644
index 00000000000..1675b6091cf
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/fusionrunner.cpp
@@ -0,0 +1,133 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "fusionrunner.h"
+#include "eventlogger.h"
+#include "fusionspec.h"
+#include <vespa/searchlib/common/serialnumfileheadercontext.h>
+#include <vespa/searchlib/attribute/fixedsourceselector.h>
+#include <vespa/searchlib/queryeval/isourceselector.h>
+#include <vespa/searchlib/util/dirtraverse.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.fusionrunner");
+
+using search::FixedSourceSelector;
+using search::TuneFileAttributes;
+using search::TuneFileIndexing;
+using search::common::FileHeaderContext;
+using search::common::SerialNumFileHeaderContext;
+using search::index::Schema;
+using search::queryeval::ISourceSelector;
+using search::diskindex::SelectorArray;
+using search::SerialNum;
+using std::vector;
+using vespalib::string;
+
+namespace searchcorespi::index {
+
+FusionRunner::FusionRunner(const string &base_dir,
+ const Schema &schema,
+ const TuneFileAttributes &tuneFileAttributes,
+ const FileHeaderContext &fileHeaderContext)
+ : _diskLayout(base_dir),
+ _schema(schema),
+ _tuneFileAttributes(tuneFileAttributes),
+ _fileHeaderContext(fileHeaderContext)
+{ }
+
+FusionRunner::~FusionRunner() = default;
+
+namespace {
+
+void readSelectorArray(const string &selector_name, SelectorArray &selector_array,
+ const vector<uint8_t> &id_map, uint32_t base_id, uint32_t fusion_id) {
+ FixedSourceSelector::UP selector =
+ FixedSourceSelector::load(selector_name, fusion_id);
+ if (base_id != selector->getBaseId()) {
+ selector = selector->cloneAndSubtract("tmp_for_fusion", base_id - selector->getBaseId());
+ }
+
+ const uint32_t num_docs = selector->getDocIdLimit();
+ selector_array.reserve(num_docs);
+ auto it = selector->createIterator();
+ for (uint32_t i = 0; i < num_docs; ++i) {
+ search::queryeval::Source source = it->getSource(i);
+ // Workaround for source selector corruption.
+ // Treat out of range source as last source.
+ if (source >= id_map.size()) {
+ source = id_map.size() - 1;
+ }
+ assert(source < id_map.size());
+ selector_array.push_back(id_map[source]);
+ }
+}
+
+bool
+writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id,
+ uint32_t highest_doc_id,
+ const TuneFileAttributes &tuneFileAttributes,
+ const FileHeaderContext &fileHeaderContext)
+{
+ const search::queryeval::Source default_source = 0;
+ FixedSourceSelector fusion_selector(default_source, "fusion_selector");
+ fusion_selector.setSource(highest_doc_id, default_source);
+ fusion_selector.setBaseId(fusion_id);
+ string selector_name = IndexDiskLayout::getSelectorFileName(diskLayout.getFusionDir(fusion_id));
+ if (!fusion_selector.extractSaveInfo(selector_name)->save(tuneFileAttributes, fileHeaderContext)) {
+ LOG(warning, "Unable to write source selector data for fusion.%u.", fusion_id);
+ return false;
+ }
+ return true;
+}
+} // namespace
+
+uint32_t
+FusionRunner::fuse(const FusionSpec &fusion_spec,
+ SerialNum lastSerialNum,
+ IIndexMaintainerOperations &operations,
+ std::shared_ptr<search::IFlushToken> flush_token)
+{
+ const vector<uint32_t> &ids = fusion_spec.flush_ids;
+ if (ids.empty()) {
+ return 0;
+ }
+ const uint32_t fusion_id = ids.back();
+ const string fusion_dir = _diskLayout.getFusionDir(fusion_id);
+
+ vector<string> sources;
+ vector<uint8_t> id_map(fusion_id + 1);
+ if (fusion_spec.last_fusion_id != 0) {
+ id_map[0] = sources.size();
+ sources.push_back(_diskLayout.getFusionDir(fusion_spec.last_fusion_id));
+ }
+ for (uint32_t id : ids) {
+ id_map[id - fusion_spec.last_fusion_id] = sources.size();
+ sources.push_back(_diskLayout.getFlushDir(id));
+ }
+
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskFusionStart(sources, fusion_dir);
+ }
+ vespalib::Timer timer;
+
+ const string selector_name = IndexDiskLayout::getSelectorFileName(_diskLayout.getFlushDir(fusion_id));
+ SelectorArray selector_array;
+ readSelectorArray(selector_name, selector_array, id_map, fusion_spec.last_fusion_id, fusion_id);
+
+ if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum, flush_token)) {
+ return 0;
+ }
+
+ const uint32_t highest_doc_id = selector_array.size() - 1;
+ SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, lastSerialNum);
+ if (!writeFusionSelector(_diskLayout, fusion_id, highest_doc_id, _tuneFileAttributes, fileHeaderContext)) {
+ return 0;
+ }
+
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskFusionComplete(fusion_dir, vespalib::count_ms(timer.elapsed()));
+ }
+ return fusion_id;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/fusionrunner.h b/searchcore/src/vespa/searchcorespi/index/fusionrunner.h
new file mode 100644
index 00000000000..92ad42b76ad
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/fusionrunner.h
@@ -0,0 +1,65 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "iindexmaintaineroperations.h"
+#include "indexdisklayout.h"
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchlib/common/tunefileinfo.h>
+
+namespace search
+{
+
+namespace common
+{
+
+class FileHeaderContext;
+
+}
+
+}
+
+namespace searchcorespi {
+namespace index {
+struct FusionSpec;
+
+/**
+ * FusionRunner runs fusion on a set of disk indexes, specified as a
+ * vector of ids. The disk indexes must be stored in directories named
+ * "index.flush.<id>" within the base dir, and the fusioned indexes
+ * will be stored similarly in directories named "index.fusion.<id>".
+ **/
+class FusionRunner {
+ const IndexDiskLayout _diskLayout;
+ const search::index::Schema _schema;
+ const search::TuneFileAttributes _tuneFileAttributes;
+ const search::common::FileHeaderContext &_fileHeaderContext;
+
+public:
+ /**
+ * Create a FusionRunner that operates on indexes stored in the
+ * base dir.
+ **/
+ FusionRunner(const vespalib::string &base_dir,
+ const search::index::Schema &schema,
+ const search::TuneFileAttributes &tuneFileAttributes,
+ const search::common::FileHeaderContext &fileHeaderContext);
+ ~FusionRunner();
+
+ /**
+ * Combine the indexes specified by the ids by running fusion.
+ *
+ * @param fusion_spec the specification on which indexes to run fusion on.
+ * @param lastSerialNum the serial number of the last flushed index part of the fusion spec.
+ * @param operations interface used for running the actual fusion.
+ * @return the id of the fusioned disk index
+ **/
+ uint32_t fuse(const FusionSpec &fusion_spec,
+ search::SerialNum lastSerialNum,
+ IIndexMaintainerOperations &operations,
+ std::shared_ptr<search::IFlushToken> flush_token);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/fusionspec.h b/searchcore/src/vespa/searchcorespi/index/fusionspec.h
new file mode 100644
index 00000000000..0b147140e55
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/fusionspec.h
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+namespace searchcorespi::index {
+
+/**
+ * Specifies a set of disk index ids for fusion.
+ *
+ * Note: All ids in FusionSpec are absolute ids.
+ **/
+struct FusionSpec {
+ uint32_t last_fusion_id;
+ std::vector<uint32_t> flush_ids;
+
+ FusionSpec() : last_fusion_id(0), flush_ids() {}
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/i_thread_service.h b/searchcore/src/vespa/searchcorespi/index/i_thread_service.h
new file mode 100644
index 00000000000..f973908b62d
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/i_thread_service.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/runnable.h>
+#include <vespa/vespalib/util/threadexecutor.h>
+
+namespace searchcorespi::index {
+
+/**
+ * Interface for a single thread used for write tasks.
+ */
+struct IThreadService : public vespalib::ThreadExecutor
+{
+ IThreadService(const IThreadService &) = delete;
+ IThreadService & operator = (const IThreadService &) = delete;
+ IThreadService() = default;
+ virtual ~IThreadService() {}
+
+ /**
+ * Run the given runnable in the underlying thread and wait until its done.
+ */
+ virtual void run(vespalib::Runnable &runnable) = 0;
+
+ /**
+ * Returns whether the current thread is the underlying thread.
+ */
+ virtual bool isCurrentThread() const = 0;
+};
+
+struct ISyncableThreadService : public IThreadService, vespalib::Syncable {
+
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/idiskindex.h b/searchcore/src/vespa/searchcorespi/index/idiskindex.h
new file mode 100644
index 00000000000..010e7e7727d
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/idiskindex.h
@@ -0,0 +1,31 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchcorespi/index/indexsearchable.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi::index {
+
+/**
+ * Interface for a disk index as seen from an index maintainer.
+ */
+struct IDiskIndex : public IndexSearchable {
+ using SP = std::shared_ptr<IDiskIndex>;
+ virtual ~IDiskIndex() {}
+
+ /**
+ * Returns the directory in which this disk index exists.
+ */
+ virtual const vespalib::string &getIndexDir() const = 0;
+
+ /**
+ * Returns the schema used by this disk index.
+ * Note that the schema should be part of the index on disk.
+ */
+ virtual const search::index::Schema &getSchema() const = 0;
+};
+
+}
+
+
diff --git a/searchcore/src/vespa/searchcorespi/index/iindexcollection.cpp b/searchcore/src/vespa/searchcorespi/index/iindexcollection.cpp
new file mode 100644
index 00000000000..988c0084d4f
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/iindexcollection.cpp
@@ -0,0 +1,46 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "iindexcollection.h"
+#include "idiskindex.h"
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/searchlib/queryeval/isourceselector.h>
+
+namespace searchcorespi {
+
+using index::IDiskIndex;
+
+vespalib::string IIndexCollection::toString() const
+{
+ vespalib::asciistream s;
+ s << "selector : " << &getSourceSelector() << "(baseId=" << getSourceSelector().getBaseId()
+ << ", docidlimit=" << getSourceSelector().getDocIdLimit()
+ << ", defaultsource=" << uint32_t(getSourceSelector().getDefaultSource())
+ << ")\n";
+#if 0
+ search::queryeval::ISourceSelector::Iterator::UP it = getSourceSelector().createIterator();
+ s << "{";
+ for (size_t i(0), m(getSourceSelector().getDocIdLimit()); i < m; i++) {
+ s << uint32_t(it->getSource(i)) << ' ';
+ }
+ s << "}\n";
+#endif
+ s << getSourceCount() << " {";
+ if (getSourceCount() > 0) {
+ for (size_t i(0), m(getSourceCount()); i < m; i++) {
+ if (i != 0) {
+ s << ", ";
+ }
+ const IndexSearchable & is(getSearchable(i));
+ s << getSourceId(i) << " : " << &is << "(";
+ if (dynamic_cast<const IDiskIndex *>(&is) != NULL) {
+ s << dynamic_cast<const IDiskIndex &>(is).getIndexDir().c_str();
+ } else {
+ s << typeid(is).name();
+ }
+ s << ")";
+ }
+ }
+ s << "}";
+ return s.str();
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/iindexcollection.h b/searchcore/src/vespa/searchcorespi/index/iindexcollection.h
new file mode 100644
index 00000000000..1cbf994b44f
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/iindexcollection.h
@@ -0,0 +1,49 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "indexsearchable.h"
+
+namespace search::queryeval { class ISourceSelector; }
+namespace searchcorespi {
+
+/**
+ * Interface for a set of index searchables with source ids,
+ * and a source selector for determining which index searchable to use for each document.
+ */
+class IIndexCollection {
+protected:
+ using ISourceSelector = search::queryeval::ISourceSelector;
+public:
+ typedef std::unique_ptr<IIndexCollection> UP;
+ typedef std::shared_ptr<IIndexCollection> SP;
+
+ virtual ~IIndexCollection() {}
+
+ /**
+ * Returns the source selector used to determine which index to use for each document.
+ */
+ virtual const ISourceSelector &getSourceSelector() const = 0;
+
+ /**
+ * Returns the number sources (index searchables) for this collection.
+ */
+ virtual size_t getSourceCount() const = 0;
+
+ /**
+ * Returns the index searchable for source i (i in the range [0, getSourceCount()>).
+ */
+ virtual IndexSearchable &getSearchable(uint32_t i) const = 0;
+
+ /**
+ * Returns the source id for source i (i in the range [0, getSourceCount()>).
+ * The source id is used for this source in the source selector.
+ */
+ virtual uint32_t getSourceId(uint32_t i) const = 0;
+
+ virtual vespalib::string toString() const;
+
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcore/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
new file mode 100644
index 00000000000..9025b56dc27
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
@@ -0,0 +1,63 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "idiskindex.h"
+#include "imemoryindex.h"
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/searchlib/diskindex/docidmapper.h>
+#include <vespa/searchlib/index/i_field_length_inspector.h>
+
+namespace search { class IFlushToken; }
+
+namespace searchcorespi::index {
+
+/**
+ * Interface for operations needed by an index maintainer.
+ */
+struct IIndexMaintainerOperations {
+ using IFieldLengthInspector = search::index::IFieldLengthInspector;
+ using Schema = search::index::Schema;
+ using SelectorArray = search::diskindex::SelectorArray;
+ virtual ~IIndexMaintainerOperations() {}
+
+ /**
+ * Creates a new memory index using the given schema.
+ */
+ virtual IMemoryIndex::SP createMemoryIndex(const Schema& schema,
+ const IFieldLengthInspector& inspector,
+ search::SerialNum serialNum) = 0;
+
+ /**
+ * Loads a disk index from the given directory.
+ */
+ virtual IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir) = 0;
+
+ /**
+ * Reloads the given disk index and returns a new instance.
+ */
+ virtual IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex) = 0;
+
+ /**
+ * Runs fusion on a given set of input disk indexes to create a fusioned output disk index.
+ * The selector array contains a source for all local document ids ([0, docIdLimit>)
+ * in the range [0, sources.size()> and is used to determine in which input disk index
+ * a document is located.
+ *
+ * @param schema the schema of the resulting fusioned disk index.
+ * @param outputDir the output directory of the fusioned disk index.
+ * @param sources the directories of the input disk indexes.
+ * @param selectorArray the array specifying in which input disk index a document is located.
+ * @param lastSerialNum the serial number of the last operation in the last input disk index.
+ */
+ virtual bool runFusion(const Schema &schema,
+ const vespalib::string &outputDir,
+ const std::vector<vespalib::string> &sources,
+ const SelectorArray &selectorArray,
+ search::SerialNum lastSerialNum,
+ std::shared_ptr<search::IFlushToken> flush_token) = 0;
+};
+
+}
+
+
diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmanager.cpp b/searchcore/src/vespa/searchcorespi/index/iindexmanager.cpp
new file mode 100644
index 00000000000..70770f6f012
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/iindexmanager.cpp
@@ -0,0 +1,8 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "iindexmanager.h"
+
+namespace searchcorespi {
+
+IIndexManager::Reconfigurer::~Reconfigurer() = default;
+
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmanager.h b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h
new file mode 100644
index 00000000000..a4173b41aa5
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h
@@ -0,0 +1,206 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "indexsearchable.h"
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchcorespi/flush/flushstats.h>
+#include <vespa/searchcorespi/flush/iflushtarget.h>
+#include <vespa/searchlib/common/serialnum.h>
+
+namespace vespalib { class IDestructorCallback; }
+namespace document { class Document; }
+
+namespace searchcorespi {
+
+/**
+ * Interface for an index manager:
+ * - Keeps track of a set of indexes (i.e. both memory indexes and disk indexes).
+ * - Documents can be inserted, updated or removed in/from the active memory index.
+ * - Enables search across all the indexes.
+ * - Manages the set of indexes through flush targets to the flush engine (i.e. flushing of memory
+ * indexes and fusion of disk indexes).
+ *
+ * Key items in this interface is <em>lid</em> which is a local document id assigned to each document.
+ * This is a numeric id in the range [0...Maximum number of documents concurrently in this DB>.
+ * Local document id 0 is reserved. Another key item is the <em>serialnumber</em> which is the
+ * serial number an operation was given when first seen by the searchcore. This is a monotonic
+ * increasing number used for sequencing operations and figuring out how up to date componets are.
+ * Used during restart/replay and for deciding when to flush. Both the lid and the serial number
+ * are persisted alongside an operation to ensure correct playback during recovery.
+ */
+class IIndexManager {
+protected:
+ using Document = document::Document;
+ using SerialNum = search::SerialNum;
+ using Schema = search::index::Schema;
+ using LidVector = std::vector<uint32_t>;
+public:
+ using OnWriteDoneType = const std::shared_ptr<vespalib::IDestructorCallback> &;
+
+ struct Configure {
+ virtual ~Configure() = default;
+ virtual bool configure() = 0;
+ };
+ template <class FunctionType>
+ class LambdaConfigure : public Configure {
+ FunctionType _func;
+
+ public:
+ LambdaConfigure(FunctionType &&func)
+ : _func(std::move(func))
+ {}
+ ~LambdaConfigure() override = default;
+ bool configure() override { return _func(); }
+ };
+
+ template <class FunctionType>
+ static std::unique_ptr<Configure>
+ makeLambdaConfigure(FunctionType &&function)
+ {
+ return std::make_unique<LambdaConfigure<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function));
+ }
+
+ /**
+ * Interface used to signal when index manager has been reconfigured.
+ */
+ struct Reconfigurer {
+ using Configure = searchcorespi::IIndexManager::Configure;
+ virtual ~Reconfigurer();
+ /**
+ * Reconfigure index manager and infrastructure around it while system is in a quiescent state.
+ */
+ virtual bool reconfigure(std::unique_ptr<Configure> configure) = 0;
+ };
+
+ typedef std::unique_ptr<IIndexManager> UP;
+ typedef std::shared_ptr<IIndexManager> SP;
+
+ virtual ~IIndexManager() = default;
+
+ /**
+ * Inserts a document into the index. This method is async, caller
+ * must either wait for notification about write done or sync
+ * indexFieldWriter executor in threading service to get sync
+ * behavior.
+ *
+ * If the inserted document id already exist the old version must
+ * be removed before inserting the new.
+ *
+ * @param lid The local document id for the document.
+ *
+ * @param doc The document to insert.
+ *
+ * @param serialNum The unique monotoninc increasing serial number
+ * for this operation.
+ *
+ * @param on_write_done shared object that notifies write done when
+ * destructed.
+ **/
+ virtual void putDocument(uint32_t lid, const Document &doc, SerialNum serialNum, OnWriteDoneType on_write_done) = 0;
+
+ /**
+ * Removes the given document from the index. This method is
+ * async, caller must either wait for notification about write
+ * done or sync indexFieldWriter executor in threading service to
+ * get sync behavior.
+ *
+ * @param lid The local document id for the document.
+ *
+ * @param serialNum The unique monotoninc increasing serial number
+ * for this operation.
+ **/
+ void removeDocument(uint32_t lid, SerialNum serialNum) {
+ LidVector lids;
+ lids.push_back(lid);
+ removeDocuments(std::move(lids), serialNum);
+ }
+ virtual void removeDocuments(LidVector lids, SerialNum serialNum) = 0;
+
+ /**
+ * Commits the document puts and removes since the last commit,
+ * making them searchable. This method is async, caller must
+ * either wait for notification about write done or sync
+ * indexFieldWriter executor in threading service to get sync
+ * behavior.
+ *
+ * @param serialNum The unique monotoninc increasing serial number
+ * for this operation.
+ *
+ * @param onWriteDone shared object that notifies write done when
+ * destructed.
+ **/
+ virtual void commit(SerialNum serialNum, OnWriteDoneType onWriteDone) = 0;
+
+ /**
+ * This method is called on a regular basis to update each component with what is the highest
+ * serial number for any component. This is for all components to be able to correctly tell its age.
+ *
+ * @param serialNum The serial number of the last known operation.
+ */
+ virtual void heartBeat(SerialNum serialNum) = 0;
+
+ /**
+ * This method is called when lid space is compacted.
+ *
+ * @param lidLimit The new lid limit.
+ * @param serialNum The serial number of the lid space compaction operation.
+ */
+ virtual void compactLidSpace(uint32_t lidLimit, SerialNum serialNum) = 0;
+
+ /**
+ * Returns the current serial number of the index.
+ * This should also reflect any heart beats.
+ *
+ * @return current serial number of the component.
+ **/
+ virtual SerialNum getCurrentSerialNum() const = 0;
+
+ /**
+ * Returns the serial number of the last flushed index.
+ *
+ * @return the serial number of the last flushed index.
+ **/
+ virtual SerialNum getFlushedSerialNum() const = 0;
+
+ /**
+ * Returns the searchable that will give the correct search view of the index manager.
+ * Normally switched everytime underlying index structures are changed in a way that can not be
+ * handled in a thread safe way without locking. For instance flushing of memory index or
+ * starting using a new schema.
+ *
+ * @return the current searchable.
+ **/
+ virtual IndexSearchable::SP getSearchable() const = 0;
+
+ /**
+ * Returns searchable stats for this index manager.
+ *
+ * @return statistics gathered about underlying memory and disk indexes.
+ */
+ virtual search::SearchableStats getSearchableStats() const = 0;
+
+ /**
+ * Returns the list of all flush targets contained in this index manager.
+ *
+ * @return The list of flushable items in this component.
+ **/
+ virtual IFlushTarget::List getFlushTargets() = 0;
+
+ /**
+ * Sets the new schema to be used by this index manager.
+ *
+ * @param schema The new schema to start using.
+ **/
+ virtual void setSchema(const Schema &schema, SerialNum serialNum) = 0;
+
+ /*
+ * Sets the max number of flushed indexes before fusion is urgent.
+ *
+ * @param maxFlushed The max number of flushed indexes before fusion is urgent.
+ */
+ virtual void setMaxFlushed(uint32_t maxFlushed) = 0;
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/imemoryindex.h b/searchcore/src/vespa/searchcorespi/index/imemoryindex.h
new file mode 100644
index 00000000000..67d6e034080
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/imemoryindex.h
@@ -0,0 +1,85 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchcorespi/index/indexsearchable.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/stllike/string.h>
+#include <vespa/vespalib/util/memoryusage.h>
+
+namespace vespalib { class IDestructorCallback; }
+namespace document { class Document; }
+namespace searchcorespi::index {
+
+/**
+ * Interface for a memory index as seen from an index maintainer.
+ */
+struct IMemoryIndex : public searchcorespi::IndexSearchable {
+ using LidVector = std::vector<uint32_t>;
+ using SP = std::shared_ptr<IMemoryIndex>;
+ using OnWriteDoneType = const std::shared_ptr<vespalib::IDestructorCallback> &;
+ virtual ~IMemoryIndex() {}
+
+ /**
+ * Returns true if this memory index has received any document insert operations.
+ */
+ virtual bool hasReceivedDocumentInsert() const = 0;
+
+ /**
+ * Returns the memory usage of this memory index.
+ */
+ virtual vespalib::MemoryUsage getMemoryUsage() const = 0;
+
+ /**
+ * Returns the memory usage of an empty version of this memory index.
+ */
+ virtual uint64_t getStaticMemoryFootprint() const = 0;
+
+ /**
+ * Inserts the given document into this memory index.
+ * If the document already exists it should be removed first.
+ *
+ * @param lid the local document id.
+ * @param doc the document to insert.
+ * @param on_write_done shared object that notifies write done when destructed.
+ */
+ virtual void insertDocument(uint32_t lid, const document::Document &doc, OnWriteDoneType on_write_done) = 0;
+
+ /**
+ * Removes the given document from this memory index.
+ *
+ * @param lid the local document id.
+ */
+ void removeDocument(uint32_t lid) {
+ LidVector lids;
+ lids.push_back(lid);
+ removeDocuments(std::move(lids));
+ }
+ virtual void removeDocuments(LidVector lids) = 0;
+
+ /**
+ * Commits the inserts and removes since the last commit, making them searchable.
+ **/
+ virtual void commit(OnWriteDoneType onWriteDone, search::SerialNum serialNum) = 0;
+
+ /**
+ * Flushes this memory index to disk as a disk index.
+ * After a flush it should be possible to load a IDiskIndex from the flush directory.
+ * Note that the schema used when constructing the memory index should be flushed as well
+ * since a IDiskIndex should be able to return the schema used by the disk index.
+ *
+ * @param flushDir the directory in which to save the flushed index.
+ * @param docIdLimit the largest local document id used + 1
+ * @param serialNum the serial number of the last operation to the memory index.
+ */
+ virtual void flushToDisk(const vespalib::string &flushDir,
+ uint32_t docIdLimit,
+ search::SerialNum serialNum) = 0;
+
+ virtual void pruneRemovedFields(const search::index::Schema &schema) = 0;
+ virtual search::index::Schema::SP getPrunedSchema() const = 0;
+};
+
+}
+
+
diff --git a/searchcore/src/vespa/searchcorespi/index/index_disk_dir.h b/searchcore/src/vespa/searchcorespi/index/index_disk_dir.h
new file mode 100644
index 00000000000..335838ddf2e
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_disk_dir.h
@@ -0,0 +1,36 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+namespace searchcorespi::index {
+
+/*
+ * Class naming a disk index for a document type.
+ */
+class IndexDiskDir {
+ uint32_t _id;
+ bool _fusion;
+public:
+ IndexDiskDir(uint32_t id, bool fusion) noexcept
+ : _id(id),
+ _fusion(fusion)
+ {
+ }
+ IndexDiskDir() noexcept
+ : IndexDiskDir(0, false)
+ {
+ }
+ bool operator<(const IndexDiskDir& rhs) const noexcept {
+ if (_id != rhs._id) {
+ return _id < rhs._id;
+ }
+ return !_fusion && rhs._fusion;
+ }
+ bool operator==(const IndexDiskDir& rhs) const noexcept {
+ return (_id == rhs._id) && (_fusion == rhs._fusion);
+ }
+ bool valid() const noexcept { return _id != 0u; }
+ bool is_fusion_index() const noexcept { return _fusion; }
+ uint64_t get_id() const noexcept { return _id; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.cpp b/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.cpp
new file mode 100644
index 00000000000..ffe33d704c8
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.cpp
@@ -0,0 +1,15 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "index_disk_dir_state.h"
+#include <cassert>
+
+namespace searchcorespi::index {
+
+void
+IndexDiskDirState::deactivate() noexcept
+{
+ assert(_active_count > 0u);
+ --_active_count;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.h b/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.h
new file mode 100644
index 00000000000..d8b790b3960
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.h
@@ -0,0 +1,30 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+#include <optional>
+
+namespace searchcorespi::index {
+
+/*
+ * Class describing state for a disk index directory.
+ */
+class IndexDiskDirState {
+ uint32_t _active_count;
+ std::optional<uint64_t> _size_on_disk;
+public:
+ IndexDiskDirState()
+ : _active_count(0),
+ _size_on_disk()
+ {
+ }
+
+ void activate() noexcept { ++_active_count; }
+ void deactivate() noexcept;
+ bool is_active() const noexcept { return _active_count != 0; }
+ const std::optional<uint64_t>& get_size_on_disk() const noexcept { return _size_on_disk; }
+ void set_size_on_disk(uint64_t size_on_disk) noexcept { _size_on_disk = size_on_disk; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp
new file mode 100644
index 00000000000..855f3c69bc9
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp
@@ -0,0 +1,74 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "index_manager_explorer.h"
+#include "index_manager_stats.h"
+
+#include <vespa/vespalib/data/slime/cursor.h>
+
+using vespalib::slime::Cursor;
+using vespalib::slime::Inserter;
+using search::SearchableStats;
+using searchcorespi::index::DiskIndexStats;
+using searchcorespi::index::MemoryIndexStats;
+
+namespace searchcorespi {
+
+namespace {
+
+void
+insertDiskIndex(Cursor &arrayCursor, const DiskIndexStats &diskIndex)
+{
+ Cursor &diskIndexCursor = arrayCursor.addObject();
+ const SearchableStats &sstats = diskIndex.getSearchableStats();
+ diskIndexCursor.setLong("serialNum", diskIndex.getSerialNum());
+ diskIndexCursor.setString("indexDir", diskIndex.getIndexdir());
+ diskIndexCursor.setLong("sizeOnDisk", sstats.sizeOnDisk());
+}
+
+void
+insertMemoryUsage(Cursor &object, const vespalib::MemoryUsage &usage)
+{
+ Cursor &memory = object.setObject("memoryUsage");
+ memory.setLong("allocatedBytes", usage.allocatedBytes());
+ memory.setLong("usedBytes", usage.usedBytes());
+ memory.setLong("deadBytes", usage.deadBytes());
+ memory.setLong("onHoldBytes", usage.allocatedBytesOnHold());
+}
+
+void
+insertMemoryIndex(Cursor &arrayCursor, const MemoryIndexStats &memoryIndex)
+{
+ Cursor &memoryIndexCursor = arrayCursor.addObject();
+ const SearchableStats &sstats = memoryIndex.getSearchableStats();
+ memoryIndexCursor.setLong("serialNum", memoryIndex.getSerialNum());
+ memoryIndexCursor.setLong("docsInMemory", sstats.docsInMemory());
+ insertMemoryUsage(memoryIndexCursor, sstats.memoryUsage());
+}
+
+}
+
+
+IndexManagerExplorer::IndexManagerExplorer(IIndexManager::SP mgr)
+ : _mgr(std::move(mgr))
+{
+}
+
+void
+IndexManagerExplorer::get_state(const Inserter &inserter, bool full) const
+{
+ Cursor &object = inserter.insertObject();
+ object.setLong("lastSerialNum", _mgr->getCurrentSerialNum());
+ if (full) {
+ IndexManagerStats stats(*_mgr);
+ Cursor &diskIndexArrayCursor = object.setArray("diskIndexes");
+ for (const auto &diskIndex : stats.getDiskIndexes()) {
+ insertDiskIndex(diskIndexArrayCursor, diskIndex);
+ }
+ Cursor &memoryIndexArrayCursor = object.setArray("memoryIndexes");
+ for (const auto &memoryIndex : stats.getMemoryIndexes()) {
+ insertMemoryIndex(memoryIndexArrayCursor, memoryIndex);
+ }
+ }
+}
+
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.h b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.h
new file mode 100644
index 00000000000..3e52199eeda
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.h
@@ -0,0 +1,25 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "iindexmanager.h"
+#include <vespa/vespalib/net/state_explorer.h>
+
+namespace searchcorespi {
+
+/**
+ * Class used to explore the state of an index manager.
+ */
+class IndexManagerExplorer : public vespalib::StateExplorer
+{
+private:
+ IIndexManager::SP _mgr;
+
+public:
+ IndexManagerExplorer(IIndexManager::SP mgr);
+
+ // Implements vespalib::StateExplorer
+ virtual void get_state(const vespalib::slime::Inserter &inserter, bool full) const override;
+};
+
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_stats.cpp b/searchcore/src/vespa/searchcorespi/index/index_manager_stats.cpp
new file mode 100644
index 00000000000..a93934c1500
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_manager_stats.cpp
@@ -0,0 +1,58 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "index_manager_stats.h"
+#include "iindexmanager.h"
+#include "indexsearchablevisitor.h"
+#include "imemoryindex.h"
+
+namespace searchcorespi {
+
+namespace {
+
+class Visitor : public IndexSearchableVisitor
+{
+public:
+ std::vector<index::DiskIndexStats> _diskIndexes;
+ std::vector<index::MemoryIndexStats> _memoryIndexes;
+
+ Visitor();
+ ~Visitor();
+ void visit(const index::IDiskIndex &index) override {
+ _diskIndexes.emplace_back(index);
+ }
+ void visit(const index::IMemoryIndex &index) override {
+ _memoryIndexes.emplace_back(index);
+ }
+
+ void normalize() {
+ std::sort(_diskIndexes.begin(), _diskIndexes.end());
+ std::sort(_memoryIndexes.begin(), _memoryIndexes.end());
+ }
+};
+
+Visitor::Visitor() = default;
+Visitor::~Visitor() = default;
+
+}
+
+IndexManagerStats::IndexManagerStats()
+ : _diskIndexes(),
+ _memoryIndexes()
+{
+}
+
+IndexManagerStats::IndexManagerStats(const IIndexManager &indexManager)
+ : _diskIndexes(),
+ _memoryIndexes()
+{
+ Visitor visitor;
+ IndexSearchable::SP searchable(indexManager.getSearchable());
+ searchable->accept(visitor);
+ visitor.normalize();
+ _diskIndexes = std::move(visitor._diskIndexes);
+ _memoryIndexes = std::move(visitor._memoryIndexes);
+}
+
+IndexManagerStats::~IndexManagerStats() = default;
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_stats.h b/searchcore/src/vespa/searchcorespi/index/index_manager_stats.h
new file mode 100644
index 00000000000..1e218a62660
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_manager_stats.h
@@ -0,0 +1,31 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "disk_index_stats.h"
+#include "memory_index_stats.h"
+#include <vector>
+
+namespace searchcorespi {
+
+class IIndexManager;
+
+/**
+ * Information about an index manager usable by state explorer.
+ */
+class IndexManagerStats {
+ std::vector<index::DiskIndexStats> _diskIndexes;
+ std::vector<index::MemoryIndexStats> _memoryIndexes;
+public:
+ IndexManagerStats();
+ IndexManagerStats(const IIndexManager &indexManager);
+ ~IndexManagerStats();
+
+ const std::vector<index::DiskIndexStats> &getDiskIndexes() const {
+ return _diskIndexes;
+ }
+ const std::vector<index::MemoryIndexStats> &getMemoryIndexes() const {
+ return _memoryIndexes;
+ }
+};
+
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.cpp b/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.cpp
new file mode 100644
index 00000000000..92de7d2d292
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.cpp
@@ -0,0 +1,26 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "index_searchable_stats.h"
+#include "indexsearchable.h"
+
+
+namespace searchcorespi::index {
+
+IndexSearchableStats::IndexSearchableStats()
+ : _serialNum(0),
+ _searchableStats()
+{
+}
+
+IndexSearchableStats::IndexSearchableStats(const IndexSearchable &index)
+ : _serialNum(index.getSerialNum()),
+ _searchableStats(index.getSearchableStats())
+{
+}
+
+bool IndexSearchableStats::operator<(const IndexSearchableStats &rhs) const
+{
+ return _serialNum < rhs._serialNum;
+}
+
+} // namespace searchcorespi::index
diff --git a/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.h b/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.h
new file mode 100644
index 00000000000..a61245ddb5d
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.h
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/searchlib/util/searchable_stats.h>
+
+namespace searchcorespi { class IndexSearchable; }
+
+namespace searchcorespi::index {
+
+/**
+ * Information about a searchable index usable by state explorer.
+ */
+class IndexSearchableStats
+{
+ using SerialNum = search::SerialNum;
+ using SearchableStats = search::SearchableStats;
+ SerialNum _serialNum;
+ SearchableStats _searchableStats;
+public:
+ IndexSearchableStats();
+ IndexSearchableStats(const IndexSearchable &index);
+ bool operator<(const IndexSearchableStats &rhs) const;
+ SerialNum getSerialNum() const { return _serialNum; }
+ const SearchableStats &getSearchableStats() const { return _searchableStats; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexcollection.cpp b/searchcore/src/vespa/searchcorespi/index/indexcollection.cpp
new file mode 100644
index 00000000000..d69f7d1b0a4
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexcollection.cpp
@@ -0,0 +1,253 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "indexcollection.h"
+#include "indexsearchablevisitor.h"
+#include <vespa/searchlib/queryeval/isourceselector.h>
+#include <vespa/searchlib/queryeval/create_blueprint_visitor_helper.h>
+#include <vespa/searchlib/queryeval/intermediate_blueprints.h>
+#include <vespa/searchlib/queryeval/leaf_blueprints.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexcollection");
+
+using namespace search::queryeval;
+using namespace search::query;
+using search::attribute::IAttributeContext;
+using search::index::FieldLengthInfo;
+
+namespace searchcorespi {
+
+IndexCollection::IndexCollection(const ISourceSelector::SP & selector)
+ : _source_selector(selector),
+ _sources()
+{
+}
+
+IndexCollection::IndexCollection(const ISourceSelector::SP & selector,
+ const ISearchableIndexCollection &sources)
+ : _source_selector(selector),
+ _sources()
+{
+ for (size_t i(0), m(sources.getSourceCount()); i < m; i++) {
+ append(sources.getSourceId(i), sources.getSearchableSP(i));
+ }
+ setCurrentIndex(sources.getCurrentIndex());
+}
+
+IndexCollection::~IndexCollection() = default;
+
+void
+IndexCollection::setSource(uint32_t docId)
+{
+ assert( valid() );
+ _source_selector->setSource(docId, getCurrentIndex());
+}
+
+ISearchableIndexCollection::UP
+IndexCollection::replaceAndRenumber(const ISourceSelector::SP & selector,
+ const ISearchableIndexCollection &fsc,
+ uint32_t id_diff,
+ const IndexSearchable::SP &new_source)
+{
+ auto new_fsc = std::make_unique<IndexCollection>(selector);
+ new_fsc->append(0, new_source);
+ for (size_t i = 0; i < fsc.getSourceCount(); ++i) {
+ if (fsc.getSourceId(i) > id_diff) {
+ new_fsc->append(fsc.getSourceId(i) - id_diff, fsc.getSearchableSP(i));
+ }
+ }
+ return new_fsc;
+}
+
+void
+IndexCollection::append(uint32_t id, const IndexSearchable::SP &fs)
+{
+ _sources.push_back(SourceWithId(id, fs));
+}
+
+IndexSearchable::SP
+IndexCollection::getSearchableSP(uint32_t i) const
+{
+ return _sources[i].source_wrapper;
+}
+
+void
+IndexCollection::replace(uint32_t id, const IndexSearchable::SP &fs)
+{
+ for (size_t i = 0; i < _sources.size(); ++i) {
+ if (_sources[i].id == id) {
+ _sources[i].source_wrapper = fs;
+ return;
+ }
+ }
+ LOG(warning, "Tried to replace Searchable %d, but it wasn't there.", id);
+ append(id, fs);
+}
+
+const ISourceSelector &
+IndexCollection::getSourceSelector() const
+{
+ return *_source_selector;
+}
+
+size_t
+IndexCollection::getSourceCount() const
+{
+ return _sources.size();
+}
+
+IndexSearchable &
+IndexCollection::getSearchable(uint32_t i) const
+{
+ return *_sources[i].source_wrapper;
+}
+
+uint32_t
+IndexCollection::getSourceId(uint32_t i) const
+{
+ return _sources[i].id;
+}
+
+search::SearchableStats
+IndexCollection::getSearchableStats() const
+{
+ search::SearchableStats stats;
+ for (size_t i = 0; i < _sources.size(); ++i) {
+ stats.merge(_sources[i].source_wrapper->getSearchableStats());
+ }
+ return stats;
+}
+
+search::SerialNum
+IndexCollection::getSerialNum() const
+{
+ search::SerialNum serialNum = 0;
+ for (auto &source : _sources) {
+ serialNum = std::max(serialNum, source.source_wrapper->getSerialNum());
+ }
+ return serialNum;
+}
+
+
+void
+IndexCollection::accept(IndexSearchableVisitor &visitor) const
+{
+ for (auto &source : _sources) {
+ source.source_wrapper->accept(visitor);
+ }
+}
+
+namespace {
+
+struct Mixer {
+ const ISourceSelector &_selector;
+ std::unique_ptr<SourceBlenderBlueprint> _blender;
+
+ Mixer(const ISourceSelector &selector)
+ : _selector(selector), _blender() {}
+
+ void addIndex(Blueprint::UP index) {
+ if ( ! _blender) {
+ _blender = std::make_unique<SourceBlenderBlueprint>(_selector);
+ }
+ _blender->addChild(std::move(index));
+ }
+
+ Blueprint::UP mix() {
+ if (_blender) {
+ return std::move(_blender);
+ }
+ return std::make_unique<EmptyBlueprint>();
+ }
+};
+
+class CreateBlueprintVisitor : public search::query::QueryVisitor {
+private:
+ const IIndexCollection &_indexes;
+ const FieldSpecList &_fields;
+ const IRequestContext &_requestContext;
+ Blueprint::UP _result;
+
+ template <typename NodeType>
+ void visitTerm(NodeType &n) {
+ Mixer mixer(_indexes.getSourceSelector());
+ for (size_t i = 0; i < _indexes.getSourceCount(); ++i) {
+ Blueprint::UP blueprint = _indexes.getSearchable(i).createBlueprint(_requestContext, _fields, n);
+ blueprint->setSourceId(_indexes.getSourceId(i));
+ mixer.addIndex(std::move(blueprint));
+ }
+ _result = mixer.mix();
+ }
+
+ void visit(And &) override { }
+ void visit(AndNot &) override { }
+ void visit(Or &) override { }
+ void visit(WeakAnd &) override { }
+ void visit(Equiv &) override { }
+ void visit(Rank &) override { }
+ void visit(Near &) override { }
+ void visit(ONear &) override { }
+ void visit(SameElement &) override { }
+ void visit(TrueQueryNode &) override {}
+ void visit(FalseQueryNode &) override {}
+
+ void visit(WeightedSetTerm &n) override { visitTerm(n); }
+ void visit(DotProduct &n) override { visitTerm(n); }
+ void visit(WandTerm &n) override { visitTerm(n); }
+ void visit(Phrase &n) override { visitTerm(n); }
+ void visit(NumberTerm &n) override { visitTerm(n); }
+ void visit(LocationTerm &n) override { visitTerm(n); }
+ void visit(PrefixTerm &n) override { visitTerm(n); }
+ void visit(RangeTerm &n) override { visitTerm(n); }
+ void visit(StringTerm &n) override { visitTerm(n); }
+ void visit(SubstringTerm &n) override { visitTerm(n); }
+ void visit(SuffixTerm &n) override { visitTerm(n); }
+ void visit(PredicateQuery &n) override { visitTerm(n); }
+ void visit(RegExpTerm &n) override { visitTerm(n); }
+ void visit(NearestNeighborTerm &n) override { visitTerm(n); }
+ void visit(FuzzyTerm &n) override { visitTerm(n); }
+
+public:
+ CreateBlueprintVisitor(const IIndexCollection &indexes,
+ const FieldSpecList &fields,
+ const IRequestContext & requestContext)
+ : _indexes(indexes),
+ _fields(fields),
+ _requestContext(requestContext),
+ _result() {}
+
+ Blueprint::UP getResult() { return std::move(_result); }
+};
+
+}
+
+Blueprint::UP
+IndexCollection::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term)
+{
+ FieldSpecList fields;
+ fields.add(field);
+ return createBlueprint(requestContext, fields, term);
+}
+
+Blueprint::UP
+IndexCollection::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term)
+{
+ CreateBlueprintVisitor visitor(*this, fields, requestContext);
+ const_cast<Node &>(term).accept(visitor);
+ return visitor.getResult();
+}
+
+FieldLengthInfo
+IndexCollection::get_field_length_info(const vespalib::string& field_name) const
+{
+ if (_sources.empty()) {
+ return FieldLengthInfo();
+ }
+ return _sources.back().source_wrapper->get_field_length_info(field_name);
+}
+
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/indexcollection.h b/searchcore/src/vespa/searchcorespi/index/indexcollection.h
new file mode 100644
index 00000000000..d9fb8e973e1
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexcollection.h
@@ -0,0 +1,69 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "isearchableindexcollection.h"
+#include <vespa/searchlib/util/searchable_stats.h>
+
+namespace searchcorespi {
+
+/**
+ * Holds a set of index searchables with source ids, and a source selector for
+ * determining which index to use for each document.
+ */
+class IndexCollection : public ISearchableIndexCollection
+{
+ struct SourceWithId {
+ uint32_t id;
+ IndexSearchable::SP source_wrapper;
+
+ SourceWithId(uint32_t id_in, const IndexSearchable::SP &source_in)
+ : id(id_in), source_wrapper(source_in)
+ {}
+ SourceWithId() : id(0), source_wrapper() {}
+ };
+
+ // Selector shared across memory dumps, replaced on disk fusion operations
+ using ISourceSelectorSP = std::shared_ptr<ISourceSelector>;
+ ISourceSelectorSP _source_selector;
+ std::vector<SourceWithId> _sources;
+
+public:
+ IndexCollection(const ISourceSelectorSP & selector);
+ IndexCollection(const ISourceSelectorSP & selector, const ISearchableIndexCollection &sources);
+ ~IndexCollection();
+
+ void append(uint32_t id, const IndexSearchable::SP &source) override;
+ void replace(uint32_t id, const IndexSearchable::SP &source) override;
+ IndexSearchable::SP getSearchableSP(uint32_t i) const override;
+ void setSource(uint32_t docId) override;
+
+
+ // Implements IIndexCollection
+ const ISourceSelector &getSourceSelector() const override;
+ size_t getSourceCount() const override;
+ IndexSearchable &getSearchable(uint32_t i) const override;
+ uint32_t getSourceId(uint32_t i) const override;
+
+ // Implements IndexSearchable
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext, const FieldSpec &field, const Node &term) override;
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext, const FieldSpecList &fields, const Node &term) override;
+ search::SearchableStats getSearchableStats() const override;
+ search::SerialNum getSerialNum() const override;
+ void accept(IndexSearchableVisitor &visitor) const override;
+
+ static ISearchableIndexCollection::UP
+ replaceAndRenumber(const ISourceSelectorSP & selector, const ISearchableIndexCollection &fsc,
+ uint32_t id_diff, const IndexSearchable::SP &new_source);
+
+ // Implements IFieldLengthInspector
+ /**
+ * Returns field length info from the newest disk index, or empty info for all fields if no disk index exists.
+ */
+ search::index::FieldLengthInfo get_field_length_info(const vespalib::string& field_name) const override;
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/indexdisklayout.cpp b/searchcore/src/vespa/searchcorespi/index/indexdisklayout.cpp
new file mode 100644
index 00000000000..c701d1dfb1d
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexdisklayout.cpp
@@ -0,0 +1,77 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "indexdisklayout.h"
+#include "index_disk_dir.h"
+#include <sstream>
+
+namespace searchcorespi::index {
+
+const vespalib::string
+IndexDiskLayout::FlushDirPrefix = vespalib::string("index.flush.");
+
+const vespalib::string
+IndexDiskLayout::FusionDirPrefix = vespalib::string("index.fusion.");
+
+const vespalib::string
+IndexDiskLayout::SerialNumTag = vespalib::string("Serial num");
+
+IndexDiskLayout::IndexDiskLayout(const vespalib::string &baseDir)
+ : _baseDir(baseDir)
+{
+}
+
+vespalib::string
+IndexDiskLayout::getFlushDir(uint32_t sourceId) const
+{
+ std::ostringstream ost;
+ ost << _baseDir << "/" << FlushDirPrefix << sourceId;
+ return ost.str();
+}
+
+vespalib::string
+IndexDiskLayout::getFusionDir(uint32_t sourceId) const
+{
+ std::ostringstream ost;
+ ost << _baseDir << "/" << FusionDirPrefix << sourceId;
+ return ost.str();
+}
+
+vespalib::string
+IndexDiskLayout::getSerialNumFileName(const vespalib::string &dir)
+{
+ return dir + "/serial.dat";
+}
+
+vespalib::string
+IndexDiskLayout::getSchemaFileName(const vespalib::string &dir)
+{
+ return dir + "/schema.txt";
+}
+
+vespalib::string
+IndexDiskLayout::getSelectorFileName(const vespalib::string &dir)
+{
+ return dir + "/selector";
+}
+
+IndexDiskDir
+IndexDiskLayout::get_index_disk_dir(const vespalib::string& dir)
+{
+ auto name = dir.substr(dir.rfind('/') + 1);
+ const vespalib::string* prefix = nullptr;
+ bool fusion = false;
+ if (name.find(FlushDirPrefix) == 0) {
+ prefix = &FlushDirPrefix;
+ } else if (name.find(FusionDirPrefix) == 0) {
+ prefix = &FusionDirPrefix;
+ fusion = true;
+ } else {
+ return IndexDiskDir(); // invalid
+ }
+ std::istringstream ist(name.substr(prefix->size()));
+ uint32_t id = 0;
+ ist >> id;
+ return IndexDiskDir(id, fusion); // invalid if id == 0u
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexdisklayout.h b/searchcore/src/vespa/searchcorespi/index/indexdisklayout.h
new file mode 100644
index 00000000000..94b35936cc7
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexdisklayout.h
@@ -0,0 +1,38 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+
+class IndexDiskDir;
+
+/**
+ * Utility class used to get static aspects of the disk layout (i.e directory and file names)
+ * needed by the index maintainer.
+ */
+class IndexDiskLayout {
+public:
+ static const vespalib::string FlushDirPrefix;
+ static const vespalib::string FusionDirPrefix;
+ static const vespalib::string SerialNumTag;
+
+private:
+ vespalib::string _baseDir;
+
+public:
+ IndexDiskLayout(const vespalib::string &baseDir);
+ vespalib::string getFlushDir(uint32_t sourceId) const;
+ vespalib::string getFusionDir(uint32_t sourceId) const;
+ static IndexDiskDir get_index_disk_dir(const vespalib::string& dir);
+
+ static vespalib::string getSerialNumFileName(const vespalib::string &dir);
+ static vespalib::string getSchemaFileName(const vespalib::string &dir);
+ static vespalib::string getSelectorFileName(const vespalib::string &dir);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp
new file mode 100644
index 00000000000..e72525d0aaa
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp
@@ -0,0 +1,83 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "indexflushtarget.h"
+#include <vespa/vespalib/util/size_literals.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexflushtarget");
+
+namespace searchcorespi::index {
+
+IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer, IndexMaintainer::FlushStats flushStats)
+ : IFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX),
+ _indexMaintainer(indexMaintainer),
+ _flushStats(flushStats),
+ _numFrozenMemoryIndexes(indexMaintainer.getNumFrozenMemoryIndexes()),
+ _maxFrozenMemoryIndexes(indexMaintainer.getMaxFrozenMemoryIndexes()),
+ _lastStats()
+{
+ _lastStats.setPathElementsToLog(7);
+}
+
+IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer)
+ : IndexFlushTarget(indexMaintainer, indexMaintainer.getFlushStats())
+{}
+
+IndexFlushTarget::~IndexFlushTarget() = default;
+
+IFlushTarget::MemoryGain
+IndexFlushTarget::getApproxMemoryGain() const
+{
+ return MemoryGain(_flushStats.memory_before_bytes, _flushStats.memory_after_bytes);
+}
+
+IFlushTarget::DiskGain
+IndexFlushTarget::getApproxDiskGain() const
+{
+ return DiskGain(0, 0);
+}
+
+bool
+IndexFlushTarget::needUrgentFlush() const
+{
+ // Due to limitation of 16G address space of single datastore
+ // TODO: Even better if urgency was decided by memory index itself.
+ bool urgent = (_numFrozenMemoryIndexes > _maxFrozenMemoryIndexes) ||
+ (getApproxMemoryGain().gain() > ssize_t(16_Gi));
+ SerialNum flushedSerial = _indexMaintainer.getFlushedSerialNum();
+ LOG(debug, "Num frozen: %u Memory gain: %" PRId64 " Urgent: %d, flushedSerial=%" PRIu64,
+ _numFrozenMemoryIndexes, getApproxMemoryGain().gain(), static_cast<int>(urgent), flushedSerial);
+ return urgent;
+}
+
+IFlushTarget::Time
+IndexFlushTarget::getLastFlushTime() const
+{
+ return _indexMaintainer.getLastFlushTime();
+}
+
+IFlushTarget::SerialNum
+IndexFlushTarget::getFlushedSerialNum() const
+{
+ return _indexMaintainer.getFlushedSerialNum();
+}
+
+IFlushTarget::Task::UP
+IndexFlushTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken>)
+{
+ // the target must live until this task is done (handled by flush engine).
+ return _indexMaintainer.initFlush(serialNum, &_lastStats);
+}
+
+uint64_t
+IndexFlushTarget::getApproxBytesToWriteToDisk() const
+{
+ MemoryGain gain(_flushStats.memory_before_bytes, _flushStats.memory_after_bytes);
+ if (gain.getAfter() < gain.getBefore()) {
+ return gain.getBefore() - gain.getAfter();
+ } else {
+ return 0;
+ }
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h
new file mode 100644
index 00000000000..2b9ecc9574b
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h
@@ -0,0 +1,38 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "indexmaintainer.h"
+#include <vespa/searchcorespi/flush/iflushtarget.h>
+
+namespace searchcorespi::index {
+
+/**
+ * Flush target for flushing a memory index in an IndexMaintainer.
+ **/
+class IndexFlushTarget : public IFlushTarget {
+private:
+ IndexMaintainer &_indexMaintainer;
+ const IndexMaintainer::FlushStats _flushStats;
+ uint32_t _numFrozenMemoryIndexes;
+ uint32_t _maxFrozenMemoryIndexes;
+ FlushStats _lastStats;
+
+public:
+ explicit IndexFlushTarget(IndexMaintainer &indexMaintainer);
+ IndexFlushTarget(IndexMaintainer &indexMaintainer, IndexMaintainer::FlushStats flushStats);
+ ~IndexFlushTarget() override;
+
+ // Implements IFlushTarget
+ MemoryGain getApproxMemoryGain() const override;
+ DiskGain getApproxDiskGain() const override;
+ SerialNum getFlushedSerialNum() const override;
+ Time getLastFlushTime() const override;
+
+ bool needUrgentFlush() const override;
+
+ Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
+ FlushStats getLastFlushStats() const override { return _lastStats; }
+ uint64_t getApproxBytesToWriteToDisk() const override;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp
new file mode 100644
index 00000000000..1df6d321f99
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp
@@ -0,0 +1,102 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "indexfusiontarget.h"
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexfusiontarget");
+
+namespace searchcorespi::index {
+
+using search::SerialNum;
+namespace {
+
+class Fusioner : public FlushTask {
+private:
+ IndexMaintainer &_indexMaintainer;
+ FlushStats &_stats;
+ SerialNum _serialNum;
+ std::shared_ptr<search::IFlushToken> _flush_token;
+public:
+ Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) :
+ _indexMaintainer(indexMaintainer),
+ _stats(stats),
+ _serialNum(serialNum),
+ _flush_token(std::move(flush_token))
+ {}
+
+ void run() override {
+ vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum, _flush_token);
+ // the target must live until this task is done (handled by flush engine).
+ _stats.setPath(outputFusionDir);
+ }
+
+ SerialNum getFlushSerial() const override {
+ return 0u; // Zero means that no tls syncing is needed
+ }
+};
+
+}
+IndexFusionTarget::IndexFusionTarget(IndexMaintainer &indexMaintainer)
+ : IFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX),
+ _indexMaintainer(indexMaintainer),
+ _fusionStats(indexMaintainer.getFusionStats()),
+ _lastStats()
+{
+ _lastStats.setPathElementsToLog(7);
+ LOG(debug, "New target, Num flushed: %d, Disk usage: %" PRIu64, _fusionStats.numUnfused, _fusionStats.diskUsage);
+}
+
+IndexFusionTarget::~IndexFusionTarget() = default;
+
+IFlushTarget::MemoryGain
+IndexFusionTarget::getApproxMemoryGain() const
+{
+ return MemoryGain(0, 0);
+}
+
+IFlushTarget::DiskGain
+IndexFusionTarget::getApproxDiskGain() const
+{
+ uint64_t diskUsageBefore = _fusionStats.diskUsage;
+ uint64_t diskUsageGain = static_cast<uint64_t>((0.1 * (diskUsageBefore * std::max(0,static_cast<int>(_fusionStats.numUnfused - 1)))));
+ diskUsageGain = std::min(diskUsageGain, diskUsageBefore);
+ if (!_fusionStats._canRunFusion)
+ diskUsageGain = 0;
+ return DiskGain(diskUsageBefore, diskUsageBefore - diskUsageGain);
+}
+
+bool
+IndexFusionTarget::needUrgentFlush() const
+{
+ bool urgent = (_fusionStats.numUnfused > _fusionStats.maxFlushed) && (_fusionStats._canRunFusion);
+ LOG(debug, "Num flushed: %d Urgent: %d", _fusionStats.numUnfused, urgent);
+ return urgent;
+}
+
+IFlushTarget::Time
+IndexFusionTarget::getLastFlushTime() const
+{
+ return vespalib::system_clock::now();
+}
+
+IFlushTarget::SerialNum
+IndexFusionTarget::getFlushedSerialNum() const
+{
+ // Lack of fusion operation doesn't prevent transaction log
+ // pruning.
+ return _indexMaintainer.getCurrentSerialNum();
+}
+
+IFlushTarget::Task::UP
+IndexFusionTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token)
+{
+ return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum, std::move(flush_token));
+}
+
+uint64_t
+IndexFusionTarget::getApproxBytesToWriteToDisk() const
+{
+ return _fusionStats.diskUsage;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h
new file mode 100644
index 00000000000..7a9f44e6612
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "indexmaintainer.h"
+#include <vespa/searchcorespi/flush/iflushtarget.h>
+
+namespace searchcorespi::index {
+
+/**
+ * Flush target for doing fusion on disk indexes in an IndexMaintainer.
+ **/
+class IndexFusionTarget : public IFlushTarget {
+private:
+ IndexMaintainer &_indexMaintainer;
+ IndexMaintainer::FusionStats _fusionStats;
+ FlushStats _lastStats;
+
+public:
+ IndexFusionTarget(IndexMaintainer &indexMaintainer);
+ ~IndexFusionTarget() override;
+
+ // Implements IFlushTarget
+ MemoryGain getApproxMemoryGain() const override;
+ DiskGain getApproxDiskGain() const override;
+ SerialNum getFlushedSerialNum() const override;
+ Time getLastFlushTime() const override;
+ bool needUrgentFlush() const override;
+
+ Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override;
+ FlushStats getLastFlushStats() const override { return _lastStats; }
+ uint64_t getApproxBytesToWriteToDisk() const override;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
new file mode 100644
index 00000000000..af273bc5e45
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -0,0 +1,1363 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "diskindexcleaner.h"
+#include "eventlogger.h"
+#include "fusionrunner.h"
+#include "indexflushtarget.h"
+#include "indexfusiontarget.h"
+#include "indexmaintainer.h"
+#include "indexreadutilities.h"
+#include "indexwriteutilities.h"
+#include "index_disk_dir.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/searchcorespi/flush/lambdaflushtask.h>
+#include <vespa/searchlib/common/i_flush_token.h>
+#include <vespa/searchlib/index/schemautil.h>
+#include <vespa/searchlib/util/dirtraverse.h>
+#include <vespa/searchlib/util/filekit.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/util/array.hpp>
+#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/gate.h>
+#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/vespalib/util/time.h>
+#include <vespa/fastos/file.h>
+#include <sstream>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexmaintainer");
+
+using document::Document;
+using search::FixedSourceSelector;
+using search::TuneFileAttributes;
+using search::index::Schema;
+using search::index::SchemaUtil;
+using search::common::FileHeaderContext;
+using search::queryeval::ISourceSelector;
+using search::queryeval::Source;
+using search::SerialNum;
+using vespalib::makeLambdaTask;
+using vespalib::makeSharedLambdaCallback;
+using std::ostringstream;
+using vespalib::string;
+using vespalib::Executor;
+using vespalib::Runnable;
+using vespalib::IDestructorCallback;
+
+namespace searchcorespi::index {
+
+using Configure = IIndexManager::Configure;
+using Reconfigurer = IIndexManager::Reconfigurer;
+
+namespace {
+
+class ReconfigRunnable : public Runnable {
+public:
+ bool &_result;
+ Reconfigurer &_reconfigurer;
+ std::unique_ptr<Configure> _configure;
+
+ ReconfigRunnable(bool &result, Reconfigurer &reconfigurer, std::unique_ptr<Configure> configure)
+ : _result(result),
+ _reconfigurer(reconfigurer),
+ _configure(std::move(configure))
+ { }
+
+ void run() override {
+ _result = _reconfigurer.reconfigure(std::move(_configure));
+ }
+};
+
+class ReconfigRunnableTask : public Executor::Task {
+private:
+ Reconfigurer &_reconfigurer;
+ std::unique_ptr<Configure> _configure;
+public:
+ ReconfigRunnableTask(Reconfigurer &reconfigurer, std::unique_ptr<Configure> configure)
+ : _reconfigurer(reconfigurer),
+ _configure(std::move(configure))
+ { }
+ ~ReconfigRunnableTask() override;
+ void run() override {
+ _reconfigurer.reconfigure(std::move(_configure));
+ }
+};
+
+ReconfigRunnableTask::~ReconfigRunnableTask() = default;
+
+SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max();
+
+
+class DiskIndexWithDestructorCallback : public IDiskIndex {
+private:
+ std::shared_ptr<IDestructorCallback> _callback;
+ IDiskIndex::SP _index;
+ IndexDiskDir _index_disk_dir;
+ IndexDiskLayout& _layout;
+ DiskIndexes& _disk_indexes;
+
+public:
+ DiskIndexWithDestructorCallback(IDiskIndex::SP index,
+ std::shared_ptr<IDestructorCallback> callback,
+ IndexDiskLayout& layout,
+ DiskIndexes& disk_indexes) noexcept
+ : _callback(std::move(callback)),
+ _index(std::move(index)),
+ _index_disk_dir(IndexDiskLayout::get_index_disk_dir(_index->getIndexDir())),
+ _layout(layout),
+ _disk_indexes(disk_indexes)
+ {
+ }
+ ~DiskIndexWithDestructorCallback() override;
+ const IDiskIndex &getWrapped() const { return *_index; }
+
+ /**
+ * Implements searchcorespi::IndexSearchable
+ */
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term) override
+ {
+ FieldSpecList fsl;
+ fsl.add(field);
+ return _index->createBlueprint(requestContext, fsl, term);
+ }
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term) override
+ {
+ return _index->createBlueprint(requestContext, fields, term);
+ }
+ search::SearchableStats getSearchableStats() const override;
+ search::SerialNum getSerialNum() const override {
+ return _index->getSerialNum();
+ }
+ void accept(IndexSearchableVisitor &visitor) const override {
+ _index->accept(visitor);
+ }
+
+ /**
+ * Implements IFieldLengthInspector
+ */
+ search::index::FieldLengthInfo get_field_length_info(const vespalib::string& field_name) const override {
+ return _index->get_field_length_info(field_name);
+ }
+
+ /**
+ * Implements IDiskIndex
+ */
+ const vespalib::string &getIndexDir() const override { return _index->getIndexDir(); }
+ const search::index::Schema &getSchema() const override { return _index->getSchema(); }
+
+};
+
+DiskIndexWithDestructorCallback::~DiskIndexWithDestructorCallback() = default;
+
+search::SearchableStats
+DiskIndexWithDestructorCallback::getSearchableStats() const
+{
+ auto stats = _index->getSearchableStats();
+ uint64_t transient_size = _disk_indexes.get_transient_size(_layout, _index_disk_dir);
+ stats.fusion_size_on_disk(transient_size);
+ return stats;
+}
+
+
+} // namespace
+
+IndexMaintainer::FrozenMemoryIndexRef::~FrozenMemoryIndexRef() = default;
+
+IndexMaintainer::FusionArgs::FusionArgs()
+ : _new_fusion_id(0u),
+ _changeGens(),
+ _schema(),
+ _prunedSchema(),
+ _old_source_list()
+{ }
+
+IndexMaintainer::FusionArgs::~FusionArgs() = default;
+
+IndexMaintainer::SetSchemaArgs::SetSchemaArgs() = default;
+IndexMaintainer::SetSchemaArgs::~SetSchemaArgs() = default;
+
+uint32_t
+IndexMaintainer::getNewAbsoluteId()
+{
+ return _next_id++;
+}
+
+string
+IndexMaintainer::getFlushDir(uint32_t sourceId) const
+{
+ return _layout.getFlushDir(sourceId);
+}
+
+string
+IndexMaintainer::getFusionDir(uint32_t sourceId) const
+{
+ return _layout.getFusionDir(sourceId);
+}
+
+bool
+IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll)
+{
+ bool hasReopenedAnything(false);
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ uint32_t count = coll.getSourceCount();
+ for (uint32_t i = 0; i < count; ++i) {
+ IndexSearchable &is = coll.getSearchable(i);
+ const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is);
+ if (d == nullptr) {
+ continue; // not a disk index
+ }
+ const string indexDir = d->getIndexDir();
+ vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir);
+ Schema trimmedSchema;
+ if (!trimmedSchema.loadFromFile(schemaName)) {
+ LOG(error, "Could not open schema '%s'", schemaName.c_str());
+ }
+ if (trimmedSchema != d->getSchema()) {
+ IDiskIndex::SP newIndex(reloadDiskIndex(*d));
+ coll.replace(coll.getSourceId(i), newIndex);
+ hasReopenedAnything = true;
+ }
+ }
+ return hasReopenedAnything;
+}
+
+void
+IndexMaintainer::updateDiskIndexSchema(const vespalib::string &indexDir,
+ const Schema &schema,
+ SerialNum serialNum)
+{
+ // Called by a flush worker thread OR document db executor thread
+ LockGuard lock(_schemaUpdateLock);
+ IndexWriteUtilities::updateDiskIndexSchema(indexDir, schema, serialNum);
+}
+
+void
+IndexMaintainer::updateIndexSchemas(IIndexCollection &coll,
+ const Schema &schema,
+ SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ uint32_t count = coll.getSourceCount();
+ for (uint32_t i = 0; i < count; ++i) {
+ IndexSearchable &is = coll.getSearchable(i);
+ const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is);
+ if (d == nullptr) {
+ IMemoryIndex *const m = dynamic_cast<IMemoryIndex *>(&is);
+ if (m != nullptr) {
+ m->pruneRemovedFields(schema);
+ }
+ continue;
+ }
+ updateDiskIndexSchema(d->getIndexDir(), schema, serialNum);
+ }
+}
+
+void
+IndexMaintainer::updateActiveFusionPrunedSchema(const Schema &schema)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ for (;;) {
+ Schema::SP activeFusionSchema;
+ Schema::SP activeFusionPrunedSchema;
+ Schema::SP newActiveFusionPrunedSchema;
+ {
+ LockGuard lock(_state_lock);
+ activeFusionSchema = _activeFusionSchema;
+ activeFusionPrunedSchema = _activeFusionPrunedSchema;
+ }
+ if (!activeFusionSchema)
+ return; // No active fusion
+ if (!activeFusionPrunedSchema) {
+ Schema::UP newSchema = Schema::intersect(*activeFusionSchema, schema);
+ newActiveFusionPrunedSchema = std::move(newSchema);
+ } else {
+ Schema::UP newSchema = Schema::intersect(*activeFusionPrunedSchema, schema);
+ newActiveFusionPrunedSchema = std::move(newSchema);
+ }
+ {
+ LockGuard slock(_state_lock);
+ LockGuard ilock(_index_update_lock);
+ if (activeFusionSchema == _activeFusionSchema &&
+ activeFusionPrunedSchema == _activeFusionPrunedSchema)
+ {
+ _activeFusionPrunedSchema = newActiveFusionPrunedSchema;
+ break;
+ }
+ }
+ }
+}
+
+void
+IndexMaintainer::deactivateDiskIndexes(vespalib::string indexDir)
+{
+ _disk_indexes->notActive(indexDir);
+ removeOldDiskIndexes();
+}
+
+IDiskIndex::SP
+IndexMaintainer::loadDiskIndex(const string &indexDir)
+{
+ // Called by a flush worker thread OR CTOR (in document db init executor thread)
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskIndexLoadStart(indexDir);
+ }
+ vespalib::Timer timer;
+ auto index = _operations.loadDiskIndex(indexDir);
+ auto stats = index->getSearchableStats();
+ _disk_indexes->setActive(indexDir, stats.sizeOnDisk());
+ auto retval = std::make_shared<DiskIndexWithDestructorCallback>(
+ std::move(index),
+ makeSharedLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }),
+ _layout, *_disk_indexes);
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed()));
+ }
+ return retval;
+}
+
+IDiskIndex::SP
+IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex)
+{
+ // Called by a flush worker thread OR document db executor thread
+ const string indexDir = oldIndex.getIndexDir();
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskIndexLoadStart(indexDir);
+ }
+ vespalib::Timer timer;
+ const IDiskIndex &wrappedDiskIndex = (dynamic_cast<const DiskIndexWithDestructorCallback &>(oldIndex)).getWrapped();
+ auto index = _operations.reloadDiskIndex(wrappedDiskIndex);
+ auto stats = index->getSearchableStats();
+ _disk_indexes->setActive(indexDir, stats.sizeOnDisk());
+ auto retval = std::make_shared<DiskIndexWithDestructorCallback>(
+ std::move(index),
+ makeSharedLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }),
+ _layout, *_disk_indexes);
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed()));
+ }
+ return retval;
+}
+
+IDiskIndex::SP
+IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex,
+ uint32_t indexId,
+ uint32_t docIdLimit,
+ SerialNum serialNum,
+ FixedSourceSelector::SaveInfo &saveInfo)
+{
+ // Called by a flush worker thread
+ const string flushDir = getFlushDir(indexId);
+ memoryIndex.flushToDisk(flushDir, docIdLimit, serialNum);
+ Schema::SP prunedSchema(memoryIndex.getPrunedSchema());
+ if (prunedSchema) {
+ updateDiskIndexSchema(flushDir, *prunedSchema, noSerialNumHigh);
+ }
+ IndexWriteUtilities::writeSourceSelector(saveInfo, indexId, getAttrTune(),
+ _ctx.getFileHeaderContext(), serialNum);
+ IndexWriteUtilities::writeSerialNum(serialNum, flushDir, _ctx.getFileHeaderContext());
+ return loadDiskIndex(flushDir);
+}
+
+ISearchableIndexCollection::UP
+IndexMaintainer::loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList)
+{
+ // Called by CTOR (in document db init executor thread)
+ uint32_t fusion_id = spec.last_fusion_id;
+ if (fusion_id != 0) {
+ sourceList->append(0, loadDiskIndex(getFusionDir(fusion_id)));
+ }
+ for (size_t i = 0; i < spec.flush_ids.size(); ++i) {
+ const uint32_t id = spec.flush_ids[i];
+ const uint32_t relative_id = id - fusion_id;
+ sourceList->append(relative_id, loadDiskIndex(getFlushDir(id)));
+ }
+ return sourceList;
+}
+
+namespace {
+
+ using LockGuard = std::lock_guard<std::mutex>;
+
+ISearchableIndexCollection::SP
+getLeaf(const LockGuard &newSearchLock, const ISearchableIndexCollection::SP & is, bool warn=false)
+{
+ if (dynamic_cast<const WarmupIndexCollection *>(is.get()) != nullptr) {
+ if (warn) {
+ LOG(info, "Already warming up an index '%s'. Start using it immediately."
+ " This is an indication that you have configured your warmup interval too long.",
+ is->toString().c_str());
+ }
+ const WarmupIndexCollection & wic(dynamic_cast<const WarmupIndexCollection &>(*is));
+ return getLeaf(newSearchLock, wic.getNextIndexCollection(), warn);
+ } else {
+ return is;
+ }
+}
+
+}
+
+/*
+ * Caller must hold _state_lock (SL).
+ */
+void
+IndexMaintainer::replaceSource(uint32_t sourceId, const IndexSearchable::SP &source)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard lock(_new_search_lock);
+ ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock);
+ indexes->replace(sourceId, source);
+ swapInNewIndex(lock, std::move(indexes), *source);
+}
+
+/*
+ * Caller must hold _state_lock (SL) and _new_search_lock (NSL), the latter
+ * passed as guard.
+ */
+void
+IndexMaintainer::swapInNewIndex(LockGuard & guard,
+ ISearchableIndexCollection::SP indexes,
+ IndexSearchable & source)
+{
+ assert(indexes->valid());
+ (void) guard;
+ if (_warmupConfig.getDuration() > vespalib::duration::zero()) {
+ if (dynamic_cast<const IDiskIndex *>(&source) != nullptr) {
+ LOG(debug, "Warming up a disk index.");
+ indexes = std::make_shared<WarmupIndexCollection>
+ (_warmupConfig, getLeaf(guard, _source_list, true), indexes,
+ static_cast<IDiskIndex &>(source), _ctx.getWarmupExecutor(),
+ _ctx.getThreadingService().clock(), *this);
+ } else {
+ LOG(debug, "No warmup needed as it is a memory index that is mapped in.");
+ }
+ }
+ LOG(debug, "Replacing indexcollection :\n%s\nwith\n%s", _source_list->toString().c_str(), indexes->toString().c_str());
+ assert(indexes->valid());
+ _source_list = std::move(indexes);
+}
+
+/*
+ * Caller must hold _state_lock (SL).
+ */
+void
+IndexMaintainer::appendSource(uint32_t sourceId, const IndexSearchable::SP &source)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard lock(_new_search_lock);
+ ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock);
+ indexes->append(sourceId, source);
+ swapInNewIndex(lock, std::move(indexes), *source);
+}
+
+ISearchableIndexCollection::UP
+IndexMaintainer::createNewSourceCollection(const LockGuard &newSearchLock)
+{
+ ISearchableIndexCollection::SP currentLeaf(getLeaf(newSearchLock, _source_list));
+ return std::make_unique<IndexCollection>(_selector, *currentLeaf);
+}
+
+IndexMaintainer::FlushArgs::FlushArgs()
+ : old_index(),
+ old_absolute_id(0),
+ old_source_list(),
+ save_info(),
+ flush_serial_num(),
+ stats(nullptr),
+ _skippedEmptyLast(false),
+ _extraIndexes(),
+ _changeGens(),
+ _prunedSchema()
+{
+}
+IndexMaintainer::FlushArgs::~FlushArgs() = default;
+IndexMaintainer::FlushArgs::FlushArgs(FlushArgs &&) = default;
+IndexMaintainer::FlushArgs & IndexMaintainer::FlushArgs::operator=(FlushArgs &&) = default;
+
+bool
+IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index)
+{
+ // Called by initFlush via reconfigurer
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard state_lock(_state_lock);
+ args->old_index = _current_index;
+ args->old_absolute_id = _current_index_id + _last_fusion_id;
+ args->old_source_list = _source_list;
+ string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id));
+ args->flush_serial_num = current_serial_num();
+ {
+ LockGuard lock(_index_update_lock);
+ // Handover of extra memory indexes to flush
+ args->_extraIndexes = _frozenMemoryIndexes;
+ _frozenMemoryIndexes.clear();
+ }
+
+ LOG(debug, "Flushing. Id = %u. Serial num = %llu",
+ args->old_absolute_id, (unsigned long long) args->flush_serial_num);
+ {
+ LockGuard lock(_index_update_lock);
+ if (!_current_index->hasReceivedDocumentInsert() &&
+ _source_selector_changes == 0 &&
+ !_flush_empty_current_index)
+ {
+ args->_skippedEmptyLast = true; // Skip flush of empty memory index
+ }
+
+ if (!args->_skippedEmptyLast) {
+ // Keep on using same source selector with extended valid range
+ args->save_info = getSourceSelector().extractSaveInfo(selector_name);
+ // XXX: Overflow issue in source selector
+ _current_index_id = getNewAbsoluteId() - _last_fusion_id;
+ assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ _selector->setDefaultSource(_current_index_id);
+ _source_selector_changes = 0;
+ }
+ _current_index = *new_index;
+ _flush_empty_current_index = false;
+ }
+ if (args->_skippedEmptyLast) {
+ replaceSource(_current_index_id, _current_index);
+ } else {
+ appendSource(_current_index_id, _current_index);
+ }
+ _source_list->setCurrentIndex(_current_index_id);
+ return true;
+}
+
+void
+IndexMaintainer::doFlush(FlushArgs args)
+{
+ // Called by a flush worker thread
+ FlushIds flushIds; // Absolute ids of flushed indexes
+
+ flushFrozenMemoryIndexes(args, flushIds);
+
+ if (!args._skippedEmptyLast) {
+ flushLastMemoryIndex(args, flushIds);
+ }
+
+ assert(!flushIds.empty());
+ if (args.stats != nullptr) {
+ updateFlushStats(args);
+ }
+
+ scheduleFusion(flushIds);
+}
+
+void
+IndexMaintainer::flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds)
+{
+ // Called by a flush worker thread
+ for (FrozenMemoryIndexRef & frozen : args._extraIndexes) {
+ assert(frozen._absoluteId < args.old_absolute_id);
+ assert(flushIds.empty() || flushIds.back() < frozen._absoluteId);
+
+ FlushArgs eArgs;
+ eArgs.old_index = frozen._index;
+ eArgs.flush_serial_num = frozen._serialNum;
+ eArgs.old_absolute_id = frozen._absoluteId;
+ const uint32_t docIdLimit = frozen._saveInfo->getHeader()._docIdLimit;
+
+ flushMemoryIndex(eArgs, docIdLimit, *frozen._saveInfo, flushIds);
+
+ // Drop references to old memory index and old save info.
+ frozen._index.reset();
+ frozen._saveInfo.reset();
+ }
+}
+
+void
+IndexMaintainer::flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds)
+{
+ // Called by a flush worker thread
+ const uint32_t docIdLimit = args.save_info->getHeader()._docIdLimit;
+ flushMemoryIndex(args, docIdLimit, *args.save_info, flushIds);
+}
+
+void
+IndexMaintainer::updateFlushStats(const FlushArgs &args)
+{
+ // Called by a flush worker thread
+ vespalib::string flushDir;
+ if (!args._skippedEmptyLast) {
+ flushDir = getFlushDir(args.old_absolute_id);
+ } else {
+ assert(!args._extraIndexes.empty());
+ flushDir = getFlushDir(args._extraIndexes.back()._absoluteId);
+ }
+ args.stats->setPath(flushDir);
+}
+
+void
+IndexMaintainer::flushMemoryIndex(FlushArgs &args,
+ uint32_t docIdLimit,
+ FixedSourceSelector::SaveInfo &saveInfo,
+ FlushIds &flushIds)
+{
+ // Called by a flush worker thread
+ ChangeGens changeGens = getChangeGens();
+ IMemoryIndex &memoryIndex = *args.old_index;
+ Schema::SP prunedSchema = memoryIndex.getPrunedSchema();
+ IDiskIndex::SP diskIndex = flushMemoryIndex(memoryIndex, args.old_absolute_id,
+ docIdLimit, args.flush_serial_num,
+ saveInfo);
+ // Post processing after memory index has been written to disk and
+ // opened as disk index.
+ args._changeGens = changeGens;
+ args._prunedSchema = prunedSchema;
+ reconfigureAfterFlush(args, diskIndex);
+
+ flushIds.push_back(args.old_absolute_id);
+}
+
+
+void
+IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex)
+{
+ // Called by a flush worker thread
+ for (;;) {
+ // Call reconfig closure for this change
+ auto configure = makeLambdaConfigure([this, argsP=&args, diskIndexP=&diskIndex]() {
+ return doneFlush(argsP, diskIndexP);
+ });
+ if (reconfigure(std::move(configure))) {
+ return;
+ }
+ ChangeGens changeGens = getChangeGens();
+ Schema::SP prunedSchema = args.old_index->getPrunedSchema();
+ const string indexDir = getFlushDir(args.old_absolute_id);
+ if (prunedSchema) {
+ updateDiskIndexSchema(indexDir, *prunedSchema, noSerialNumHigh);
+ }
+ IDiskIndex::SP reloadedDiskIndex = reloadDiskIndex(*diskIndex);
+ diskIndex = reloadedDiskIndex;
+ args._changeGens = changeGens;
+ args._prunedSchema = prunedSchema;
+ }
+}
+
+
+bool
+IndexMaintainer::doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index) {
+ // Called by doFlush via reconfigurer
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard state_lock(_state_lock);
+ IMemoryIndex &memoryIndex = *args->old_index;
+ if (args->_changeGens != getChangeGens()) {
+ return false; // Must retry operation
+ }
+ if (args->_prunedSchema != memoryIndex.getPrunedSchema()) {
+ return false; // Must retry operation
+ }
+ set_flush_serial_num(std::max(flush_serial_num(), args->flush_serial_num));
+ vespalib::system_time timeStamp = search::FileKit::getModificationTime((*disk_index)->getIndexDir());
+ _lastFlushTime = timeStamp > _lastFlushTime ? timeStamp : _lastFlushTime;
+ const uint32_t old_id = args->old_absolute_id - _last_fusion_id;
+ replaceSource(old_id, *disk_index);
+ return true;
+}
+
+void
+IndexMaintainer::scheduleFusion(const FlushIds &flushIds)
+{
+ // Called by a flush worker thread
+ LOG(debug, "Scheduled fusion for id %u.", flushIds.back());
+ LockGuard guard(_fusion_lock);
+ for (uint32_t id : flushIds) {
+ _fusion_spec.flush_ids.push_back(id);
+ }
+}
+
+bool
+IndexMaintainer::canRunFusion(const FusionSpec &spec) const
+{
+ return spec.flush_ids.size() > 1 ||
+ (spec.flush_ids.size() > 0 && spec.last_fusion_id != 0);
+}
+
+bool
+IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index)
+{
+ // Called by runFusion via reconfigurer
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard state_lock(_state_lock);
+ if (args->_changeGens != getChangeGens()) {
+ return false; // Must retry operation
+ }
+ if (args->_prunedSchema != getActiveFusionPrunedSchema()) {
+ return false; // Must retry operation
+ }
+ args->_old_source_list = _source_list; // delays destruction
+ uint32_t id_diff = args->_new_fusion_id - _last_fusion_id;
+ ostringstream ost;
+ ost << "sourceselector_fusion(" << args->_new_fusion_id << ")";
+ {
+ LockGuard lock(_index_update_lock);
+
+ // make new source selector with shifted values.
+ _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff);
+ _source_selector_changes = 0;
+ _current_index_id -= id_diff;
+ _last_fusion_id = args->_new_fusion_id;
+ _selector->setBaseId(_last_fusion_id);
+ _activeFusionSchema.reset();
+ _activeFusionPrunedSchema.reset();
+ }
+
+ ISearchableIndexCollection::SP currentLeaf;
+ {
+ LockGuard lock(_new_search_lock);
+ currentLeaf = getLeaf(lock, _source_list);
+ }
+ ISearchableIndexCollection::UP fsc =
+ IndexCollection::replaceAndRenumber(_selector, *currentLeaf, id_diff, *new_index);
+ fsc->setCurrentIndex(_current_index_id);
+
+ {
+ LockGuard lock(_new_search_lock);
+ swapInNewIndex(lock, std::move(fsc), **new_index);
+ }
+ return true;
+}
+
+bool
+IndexMaintainer::makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCollection> keepAlive)
+{
+ // called by warmupDone via reconfigurer, warmupDone() doesn't wait for us
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ ISearchableIndexCollection::SP warmIndex;
+ {
+ LockGuard state_lock(_state_lock);
+ if (keepAlive == _source_list) {
+ LockGuard lock(_new_search_lock);
+ warmIndex = (getLeaf(lock, _source_list, false));
+ _source_list = warmIndex;
+ }
+ }
+ if (warmIndex) {
+ LOG(info, "New index warmed up and switched in : %s", warmIndex->toString().c_str());
+ }
+ LOG(info, "Sync warmupExecutor.");
+ keepAlive->drainPending();
+ LOG(info, "Now the keep alive of the warmupindexcollection should be gone.");
+ return true;
+}
+
+void
+IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current)
+{
+ // Called by a search thread
+ LockGuard lock(_new_search_lock);
+ if (current == _source_list) {
+ auto makeSure = makeLambdaConfigure([this, collection=std::move(current)]() {
+ return makeSureAllRemainingWarmupIsDone(std::move(collection));
+ });
+ auto task = std::make_unique<ReconfigRunnableTask>(_ctx.getReconfigurer(), std::move(makeSure));
+ _ctx.getThreadingService().master().execute(std::move(task));
+ } else {
+ LOG(warning, "There has arrived a new IndexCollection while replacing the active index. "
+ "It can theoretically happen, but not very likely, so logging this as a warning.");
+ }
+}
+
+namespace {
+
+bool
+has_matching_interleaved_features(const Schema& old_schema, const Schema& new_schema)
+{
+ for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) {
+ if (itr.hasMatchingOldFields(old_schema) &&
+ !itr.has_matching_use_interleaved_features(old_schema))
+ {
+ return false;
+ }
+ }
+ return true;
+}
+
+}
+
+
+void
+IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor
+ LockGuard state_lock(_state_lock);
+ typedef FixedSourceSelector::SaveInfo SaveInfo;
+ args._oldSchema = _schema; // Delay destruction
+ args._oldIndex = _current_index; // Delay destruction
+ args._oldSourceList = _source_list; // Delay destruction
+ uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id;
+ string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId));
+ SerialNum freezeSerialNum = current_serial_num();
+ bool dropEmptyLast = false;
+ SaveInfo::UP saveInfo;
+
+ LOG(info, "Making new schema. Id = %u. Serial num = %llu", oldAbsoluteId, (unsigned long long) freezeSerialNum);
+ {
+ LockGuard lock(_index_update_lock);
+ _schema = args._newSchema;
+ if (!_current_index->hasReceivedDocumentInsert()) {
+ dropEmptyLast = true; // Skip flush of empty memory index
+ }
+
+ if (!dropEmptyLast) {
+ // Keep on using same source selector with extended valid range
+ saveInfo = getSourceSelector().extractSaveInfo(selectorName);
+ // XXX: Overflow issue in source selector
+ _current_index_id = getNewAbsoluteId() - _last_fusion_id;
+ assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ _selector->setDefaultSource(_current_index_id);
+ // Extra index to flush next time flushing is performed
+ _frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId);
+ }
+ _current_index = newIndex;
+ // Non-matching interleaved features in schemas means that we need to
+ // reconstruct or drop interleaved features in posting lists.
+ // If so, we must flush the new index to disk even if it is empty.
+ // This ensures that 2x triggerFlush will run fusion
+ // to reconstruct or drop interleaved features in the posting lists.
+ _flush_empty_current_index = !has_matching_interleaved_features(args._oldSchema, args._newSchema);
+ }
+ if (dropEmptyLast) {
+ replaceSource(_current_index_id, _current_index);
+ } else {
+ appendSource(_current_index_id, _current_index);
+ }
+ _source_list->setCurrentIndex(_current_index_id);
+}
+
+
+Schema
+IndexMaintainer::getSchema(void) const
+{
+ LockGuard lock(_index_update_lock);
+ return _schema;
+}
+
+Schema::SP
+IndexMaintainer::getActiveFusionPrunedSchema(void) const
+{
+ LockGuard lock(_index_update_lock);
+ return _activeFusionPrunedSchema;
+}
+
+TuneFileAttributes
+IndexMaintainer::getAttrTune(void)
+{
+ return _tuneFileAttributes;
+}
+
+IndexMaintainer::ChangeGens
+IndexMaintainer::getChangeGens(void)
+{
+ LockGuard lock(_index_update_lock);
+ return _changeGens;
+}
+
+bool
+IndexMaintainer::reconfigure(std::unique_ptr<Configure> configure)
+{
+ // Called by a flush engine worker thread
+ bool result = false;
+ ReconfigRunnable runnable(result, _ctx.getReconfigurer(), std::move(configure));
+ _ctx.getThreadingService().master().run(runnable);
+ return result;
+}
+
+IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
+ const IndexMaintainerContext &ctx,
+ IIndexMaintainerOperations &operations)
+ : _base_dir(config.getBaseDir()),
+ _warmupConfig(config.getWarmup()),
+ _disk_indexes(std::make_shared<DiskIndexes>()),
+ _layout(config.getBaseDir()),
+ _schema(config.getSchema()),
+ _activeFusionSchema(),
+ _activeFusionPrunedSchema(),
+ _source_selector_changes(0),
+ _selector(),
+ _source_list(),
+ _last_fusion_id(),
+ _next_id(),
+ _current_index_id(),
+ _current_index(),
+ _flush_empty_current_index(false),
+ _current_serial_num(0),
+ _flush_serial_num(0),
+ _lastFlushTime(),
+ _frozenMemoryIndexes(),
+ _state_lock(),
+ _index_update_lock(),
+ _new_search_lock(),
+ _remove_lock(),
+ _fusion_spec(),
+ _fusion_lock(),
+ _maxFlushed(config.getMaxFlushed()),
+ _maxFrozen(10),
+ _changeGens(),
+ _schemaUpdateLock(),
+ _tuneFileAttributes(config.getTuneFileAttributes()),
+ _ctx(ctx),
+ _operations(operations)
+{
+ // Called by document db init executor thread
+ _changeGens.bumpPruneGen();
+ DiskIndexCleaner::clean(_base_dir, *_disk_indexes);
+ FusionSpec spec = IndexReadUtilities::readFusionSpec(_base_dir);
+ _next_id = 1 + (spec.flush_ids.empty() ? spec.last_fusion_id : spec.flush_ids.back());
+ _last_fusion_id = spec.last_fusion_id;
+
+ if (_next_id > 1) {
+ string latest_index_dir = spec.flush_ids.empty()
+ ? getFusionDir(_next_id - 1)
+ : getFlushDir(_next_id - 1);
+
+ set_flush_serial_num(IndexReadUtilities::readSerialNum(latest_index_dir));
+ _lastFlushTime = search::FileKit::getModificationTime(latest_index_dir);
+ set_current_serial_num(flush_serial_num());
+ const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir);
+ _selector = FixedSourceSelector::load(selector, _next_id - 1);
+ } else {
+ set_flush_serial_num(0);
+ _selector = std::make_shared<FixedSourceSelector>(0, "sourceselector", 1);
+ }
+ uint32_t baseId(_selector->getBaseId());
+ if (_last_fusion_id != baseId) {
+ assert(_last_fusion_id > baseId);
+ uint32_t id_diff = _last_fusion_id - baseId;
+ ostringstream ost;
+ ost << "sourceselector_fusion(" << _last_fusion_id << ")";
+ _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff);
+ assert(_last_fusion_id == _selector->getBaseId());
+ }
+ _current_index_id = getNewAbsoluteId() - _last_fusion_id;
+ assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ _selector->setDefaultSource(_current_index_id);
+ auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector));
+ _current_index = operations.createMemoryIndex(_schema, *sourceList, current_serial_num());
+ LOG(debug, "Index manager created with flushed serial num %" PRIu64, flush_serial_num());
+ sourceList->append(_current_index_id, _current_index);
+ sourceList->setCurrentIndex(_current_index_id);
+ _source_list = std::move(sourceList);
+ _fusion_spec = spec;
+ _ctx.getThreadingService().master().execute(makeLambdaTask([this,&config]() {
+ pruneRemovedFields(_schema, config.getSerialNum());
+ }));
+ _ctx.getThreadingService().master().sync();
+}
+
+IndexMaintainer::~IndexMaintainer()
+{
+ _source_list.reset();
+ _frozenMemoryIndexes.clear();
+ _selector.reset();
+}
+
+FlushTask::UP
+IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stats)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits
+ {
+ LockGuard lock(_index_update_lock);
+ set_current_serial_num(std::max(current_serial_num(), serialNum));
+ }
+
+ IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num()));
+ FlushArgs args;
+ args.stats = stats;
+ // Ensure that all index thread tasks accessing memory index have completed.
+ commit_and_wait();
+ // Call reconfig closure for this change
+ auto configure = makeLambdaConfigure([this, argsP=&args, indexP=&new_index]() {
+ return doneInitFlush(argsP, indexP);
+ });
+ bool success = _ctx.getReconfigurer().reconfigure(std::move(configure));
+ assert(success);
+ (void) success;
+ if (args._skippedEmptyLast && args._extraIndexes.empty()) {
+ // No memory index to flush, it was empty
+ LockGuard lock(_state_lock);
+ set_flush_serial_num(current_serial_num());
+ _lastFlushTime = vespalib::system_clock::now();
+ LOG(debug, "No memory index to flush. Update serial number and flush time to current: "
+ "flushSerialNum(%" PRIu64 "), lastFlushTime(%f)",
+ flush_serial_num(), vespalib::to_s(_lastFlushTime.time_since_epoch()));
+ return FlushTask::UP();
+ }
+ SerialNum realSerialNum = args.flush_serial_num;
+ return makeLambdaFlushTask([this, myargs=std::move(args)]() mutable { doFlush(std::move(myargs)); }, realSerialNum);
+}
+
+FusionSpec
+IndexMaintainer::getFusionSpec()
+{
+ // Only called by unit test
+ LockGuard guard(_fusion_lock);
+ return _fusion_spec;
+}
+
+string
+IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token)
+{
+ // Called by a flush engine worker thread
+
+ // Make sure to update serial num in case it is something that does not receive any data.
+ // XXX: Wrong, and will cause data loss.
+ // XXX: Missing locking.
+ // XXX: Claims to have flushed memory index when starting fusion.
+ {
+ LockGuard lock(_index_update_lock);
+ set_current_serial_num(std::max(current_serial_num(), serialNum));
+ }
+
+ FusionSpec spec;
+ {
+ LockGuard guard(_fusion_lock);
+ if (!canRunFusion(_fusion_spec))
+ return "";
+ spec = _fusion_spec;
+ _fusion_spec.flush_ids.clear();
+ }
+
+ uint32_t new_fusion_id = runFusion(spec, flush_token);
+
+ LockGuard lock(_fusion_lock);
+ if (new_fusion_id == spec.last_fusion_id) { // Error running fusion.
+ string fail_dir = getFusionDir(spec.flush_ids.back());
+ if (flush_token->stop_requested()) {
+ LOG(info, "Fusion stopped for id %u, fusion dir \"%s\".", spec.flush_ids.back(), fail_dir.c_str());
+ } else {
+ LOG(warning, "Fusion failed for id %u, fusion dir \"%s\".", spec.flush_ids.back(), fail_dir.c_str());
+ }
+ // Restore fusion spec.
+ copy(_fusion_spec.flush_ids.begin(), _fusion_spec.flush_ids.end(), back_inserter(spec.flush_ids));
+ _fusion_spec.flush_ids.swap(spec.flush_ids);
+ } else {
+ _fusion_spec.last_fusion_id = new_fusion_id;
+ }
+ return getFusionDir(new_fusion_id);
+}
+
+namespace {
+
+class RemoveFusionIndexGuard {
+ DiskIndexes* _disk_indexes;
+ IndexDiskDir _index_disk_dir;
+public:
+ RemoveFusionIndexGuard(DiskIndexes& disk_indexes, IndexDiskDir index_disk_dir)
+ : _disk_indexes(&disk_indexes),
+ _index_disk_dir(index_disk_dir)
+ {
+ _disk_indexes->add_not_active(index_disk_dir);
+ }
+ ~RemoveFusionIndexGuard() {
+ if (_disk_indexes != nullptr) {
+ (void) _disk_indexes->remove(_index_disk_dir);
+ }
+ }
+ void reset() { _disk_indexes = nullptr; }
+};
+
+}
+
+uint32_t
+IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token)
+{
+ // Called by a flush engine worker thread
+ FusionArgs args;
+ TuneFileAttributes tuneFileAttributes(getAttrTune());
+ {
+ LockGuard slock(_state_lock);
+ LockGuard ilock(_index_update_lock);
+ _activeFusionSchema = std::make_shared<Schema>(_schema);
+ _activeFusionPrunedSchema.reset();
+ args._schema = _schema;
+ }
+ FastOS_StatInfo statInfo;
+ string lastFlushDir(getFlushDir(fusion_spec.flush_ids.back()));
+ string lastSerialFile = IndexDiskLayout::getSerialNumFileName(lastFlushDir);
+ SerialNum serialNum = 0;
+ if (FastOS_File::Stat(lastSerialFile.c_str(), &statInfo)) {
+ serialNum = IndexReadUtilities::readSerialNum(lastFlushDir);
+ }
+ IndexDiskDir fusion_index_disk_dir(fusion_spec.flush_ids.back(), true);
+ RemoveFusionIndexGuard remove_fusion_index_guard(*_disk_indexes, fusion_index_disk_dir);
+ FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext());
+ uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, flush_token);
+ bool ok = (new_fusion_id != 0);
+ if (ok) {
+ ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()),
+ getFusionDir(new_fusion_id));
+ }
+ if (!ok) {
+ string fail_dir = getFusionDir(fusion_spec.flush_ids.back());
+ if (flush_token->stop_requested()) {
+ LOG(info, "Fusion stopped, fusion dir \"%s\".", fail_dir.c_str());
+ } else {
+ LOG(error, "Fusion failed, fusion dir \"%s\".", fail_dir.c_str());
+ }
+ FastOS_FileInterface::EmptyAndRemoveDirectory(fail_dir.c_str());
+ {
+ LockGuard slock(_state_lock);
+ LockGuard ilock(_index_update_lock);
+ _activeFusionSchema.reset();
+ _activeFusionPrunedSchema.reset();
+ }
+ vespalib::File::sync(vespalib::dirname(fail_dir));
+ return fusion_spec.last_fusion_id;
+ }
+
+ const string new_fusion_dir = getFusionDir(new_fusion_id);
+ Schema::SP prunedSchema = getActiveFusionPrunedSchema();
+ if (prunedSchema) {
+ updateDiskIndexSchema(new_fusion_dir, *prunedSchema, noSerialNumHigh);
+ }
+ ChangeGens changeGens = getChangeGens();
+ IDiskIndex::SP new_index(loadDiskIndex(new_fusion_dir));
+ remove_fusion_index_guard.reset();
+
+ // Post processing after fusion operation has completed and new disk
+ // index has been opened.
+
+ args._new_fusion_id = new_fusion_id;
+ args._changeGens = changeGens;
+ args._prunedSchema = prunedSchema;
+ for (;;) {
+ // Call reconfig closure for this change
+ bool success = reconfigure(makeLambdaConfigure([this,argsP=&args,indexP=&new_index]() {
+ return doneFusion(argsP, indexP);
+ }));
+ if (success) {
+ break;
+ }
+ changeGens = getChangeGens();
+ prunedSchema = getActiveFusionPrunedSchema();
+ if (prunedSchema) {
+ updateDiskIndexSchema(new_fusion_dir, *prunedSchema, noSerialNumHigh);
+ }
+ IDiskIndex::SP diskIndex2;
+ diskIndex2 = reloadDiskIndex(*new_index);
+ new_index = diskIndex2;
+ args._changeGens = changeGens;
+ args._prunedSchema = prunedSchema;
+ }
+ removeOldDiskIndexes();
+
+ return new_fusion_id;
+}
+
+void
+IndexMaintainer::removeOldDiskIndexes()
+{
+ LockGuard slock(_remove_lock);
+ DiskIndexCleaner::removeOldIndexes(_base_dir, *_disk_indexes);
+}
+
+IndexMaintainer::FlushStats
+IndexMaintainer::getFlushStats() const
+{
+ // Called by flush engine scheduler thread (from getFlushTargets())
+ FlushStats stats;
+ uint64_t source_selector_bytes;
+ uint32_t source_selector_changes;
+ uint32_t numFrozen = 0;
+ {
+ LockGuard lock(_index_update_lock);
+ source_selector_bytes = _selector->getDocIdLimit() * sizeof(Source);
+ stats.memory_before_bytes += _current_index->getMemoryUsage().allocatedBytes() + source_selector_bytes;
+ stats.memory_after_bytes += _current_index->getStaticMemoryFootprint() + source_selector_bytes;
+ numFrozen = _frozenMemoryIndexes.size();
+ for (const FrozenMemoryIndexRef & frozen : _frozenMemoryIndexes) {
+ stats.memory_before_bytes += frozen._index->getMemoryUsage().allocatedBytes() + source_selector_bytes;
+ }
+ source_selector_changes = _source_selector_changes;
+ }
+
+ if (!source_selector_changes && stats.memory_after_bytes >= stats.memory_before_bytes) {
+ // Nothing is written if the index is empty.
+ stats.disk_write_bytes = 0;
+ stats.cpu_time_required = 0;
+ } else {
+ stats.disk_write_bytes = stats.memory_before_bytes + source_selector_bytes - stats.memory_after_bytes;
+ stats.cpu_time_required = source_selector_bytes * 3 * (1 + numFrozen) + stats.disk_write_bytes;
+ }
+ return stats;
+}
+
+IndexMaintainer::FusionStats
+IndexMaintainer::getFusionStats() const
+{
+ // Called by flush engine scheduler thread (from getFlushTargets())
+ FusionStats stats;
+ IndexSearchable::SP source_list;
+
+ {
+ LockGuard lock(_new_search_lock);
+ source_list = _source_list;
+ stats.maxFlushed = _maxFlushed;
+ }
+ stats.diskUsage = source_list->getSearchableStats().sizeOnDisk();
+ {
+ LockGuard guard(_fusion_lock);
+ stats.numUnfused = _fusion_spec.flush_ids.size() + ((_fusion_spec.last_fusion_id != 0) ? 1 : 0);
+ stats._canRunFusion = canRunFusion(_fusion_spec);
+ }
+ LOG(debug, "Get fusion stats. Disk usage: %" PRIu64 ", maxflushed: %d", stats.diskUsage, stats.maxFlushed);
+ return stats;
+}
+
+uint32_t
+IndexMaintainer::getNumFrozenMemoryIndexes(void) const
+{
+ // Called by flush engine scheduler thread (from getFlushTargets())
+ LockGuard state_lock(_index_update_lock);
+ return _frozenMemoryIndexes.size();
+}
+
+void
+IndexMaintainer::putDocument(uint32_t lid, const Document &doc, SerialNum serialNum, OnWriteDoneType on_write_done)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ try {
+ _current_index->insertDocument(lid, doc, on_write_done);
+ } catch (const vespalib::IllegalStateException & e) {
+ vespalib::string s = "Failed inserting document :\n" + doc.toXml(" ") + "\n";
+ LOG(error, "%s", s.c_str());
+ throw vespalib::IllegalStateException(s, e, VESPA_STRLOC);
+ }
+ _selector->setSource(lid, _current_index_id);
+ _source_list->setSource(lid);
+ ++_source_selector_changes;
+ set_current_serial_num(serialNum);
+}
+
+void
+IndexMaintainer::removeDocuments(LidVector lids, SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ for (uint32_t lid : lids) {
+ _selector->setSource(lid, _current_index_id);
+ _source_list->setSource(lid);
+ }
+ _source_selector_changes += lids.size();
+ set_current_serial_num(serialNum);
+ _current_index->removeDocuments(std::move(lids));
+}
+
+void
+IndexMaintainer::commit_and_wait()
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ vespalib::Gate gate;
+ _ctx.getThreadingService().index().execute(makeLambdaTask([this, &gate]() { commit(gate); }));
+ // Ensure that all index thread tasks accessing memory index have completed.
+ gate.await();
+}
+
+void
+IndexMaintainer::commit(vespalib::Gate& gate)
+{
+ // only triggered via commit_and_wait()
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), current_serial_num());
+}
+
+void
+IndexMaintainer::commit(SerialNum serialNum, OnWriteDoneType onWriteDone)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ set_current_serial_num(serialNum);
+ _current_index->commit(onWriteDone, serialNum);
+}
+
+void
+IndexMaintainer::heartBeat(SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ set_current_serial_num(serialNum);
+}
+
+void
+IndexMaintainer::compactLidSpace(uint32_t lidLimit, SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LOG(info, "compactLidSpace(%u, %" PRIu64 ")", lidLimit, serialNum);
+ LockGuard lock(_index_update_lock);
+ set_current_serial_num(serialNum);
+ _selector->compactLidSpace(lidLimit);
+}
+
+IFlushTarget::List
+IndexMaintainer::getFlushTargets()
+{
+ // Called by flush engine scheduler thread
+ IFlushTarget::List ret;
+ ret.reserve(2);
+ ret.push_back(std::make_shared<IndexFlushTarget>(*this));
+ ret.push_back(std::make_shared<IndexFusionTarget>(*this));
+ return ret;
+}
+
+void
+IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ pruneRemovedFields(schema, serialNum);
+ IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num()));
+ SetSchemaArgs args;
+
+ args._newSchema = schema;
+ // Ensure that all index thread tasks accessing memory index have completed.
+ commit_and_wait();
+ // Everything should be quiet now.
+ doneSetSchema(args, new_index);
+ // Source collection has now changed, caller must reconfigure further
+ // as appropriate.
+}
+
+void
+IndexMaintainer::pruneRemovedFields(const Schema &schema, SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ ISearchableIndexCollection::SP new_source_list;
+ IIndexCollection::SP coll = getSourceCollection();
+ updateIndexSchemas(*coll, schema, serialNum);
+ updateActiveFusionPrunedSchema(schema);
+ {
+ LockGuard state_lock(_state_lock);
+ LockGuard lock(_index_update_lock);
+ _changeGens.bumpPruneGen();
+ }
+ {
+ LockGuard state_lock(_state_lock);
+ new_source_list = std::make_shared<IndexCollection>(_selector, *_source_list);
+ }
+ if (reopenDiskIndexes(*new_source_list)) {
+ commit_and_wait();
+ // Everything should be quiet now.
+ LockGuard state_lock(_state_lock);
+ LockGuard lock(_new_search_lock);
+ _source_list = new_source_list;
+ }
+}
+
+void
+IndexMaintainer::setMaxFlushed(uint32_t maxFlushed)
+{
+ LockGuard lock(_new_search_lock);
+ _maxFlushed = maxFlushed;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h
new file mode 100644
index 00000000000..b3fb14e1c2e
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -0,0 +1,376 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "iindexmanager.h"
+#include "disk_indexes.h"
+#include "fusionspec.h"
+#include "idiskindex.h"
+#include "iindexmaintaineroperations.h"
+#include "indexdisklayout.h"
+#include "indexmaintainerconfig.h"
+#include "indexmaintainercontext.h"
+#include "imemoryindex.h"
+#include "warmupindexcollection.h"
+#include "ithreadingservice.h"
+#include "indexsearchable.h"
+#include "indexcollection.h"
+#include <vespa/searchcorespi/flush/iflushtarget.h>
+#include <vespa/searchcorespi/flush/flushstats.h>
+#include <vespa/searchlib/attribute/fixedsourceselector.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <atomic>
+
+namespace document { class Document; }
+
+namespace search::common { class FileHeaderContext; }
+namespace vespalib { class Gate; }
+
+namespace searchcorespi::index {
+
+/**
+ * The IndexMaintainer provides a holistic view of a set of disk and
+ * memory indexes. It allows updating the active memory index, enables search
+ * across all indexes, and manages the set of indexes through flushing
+ * of memory indexes and fusion of disk indexes.
+ */
+class IndexMaintainer : public IIndexManager,
+ public IWarmupDone {
+ /**
+ * Extra memory that is frozen but not yet flushed.
+ */
+ class FrozenMemoryIndexRef {
+ public:
+ typedef search::FixedSourceSelector::SaveInfo SaveInfo;
+ typedef search::SerialNum SerialNum;
+ typedef std::shared_ptr<SaveInfo> SaveInfoSP;
+
+ IMemoryIndex::SP _index;
+ SerialNum _serialNum;
+ SaveInfoSP _saveInfo;
+ uint32_t _absoluteId;
+
+ FrozenMemoryIndexRef(const IMemoryIndex::SP &index,
+ SerialNum serialNum,
+ SaveInfo::UP saveInfo,
+ uint32_t absoluteId)
+ : _index(index),
+ _serialNum(serialNum),
+ _saveInfo(saveInfo.release()),
+ _absoluteId(absoluteId)
+ { }
+ ~FrozenMemoryIndexRef();
+ };
+
+ class ChangeGens {
+ public:
+ uint32_t _pruneGen;
+
+ ChangeGens() : _pruneGen(0) { }
+ void bumpPruneGen(void) { ++_pruneGen; }
+ bool operator==(const ChangeGens &rhs) const { return _pruneGen == rhs._pruneGen; }
+ bool operator!=(const ChangeGens &rhs) const { return _pruneGen != rhs._pruneGen; }
+ };
+
+ using FlushIds = std::vector<uint32_t>;
+ using FrozenMemoryIndexRefs = std::vector<FrozenMemoryIndexRef>;
+ using ISourceSelector = search::queryeval::ISourceSelector;
+ using LockGuard = std::lock_guard<std::mutex>;
+
+ const vespalib::string _base_dir;
+ const WarmupConfig _warmupConfig;
+ DiskIndexes::SP _disk_indexes;
+ IndexDiskLayout _layout;
+ Schema _schema; // Protected by SL + IUL
+ Schema::SP _activeFusionSchema; // Protected by SL + IUL
+ // Protected by SL + IUL
+ Schema::SP _activeFusionPrunedSchema;
+ uint32_t _source_selector_changes; // Protected by IUL
+ // _selector is protected by SL + IUL
+ ISourceSelector::SP _selector;
+ ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread
+ uint32_t _last_fusion_id; // Protected by SL + IUL
+ uint32_t _next_id; // Protected by SL + IUL
+ uint32_t _current_index_id; // Protected by SL + IUL
+ IMemoryIndex::SP _current_index; // Protected by SL + IUL
+ bool _flush_empty_current_index;
+ std::atomic<SerialNum> _current_serial_num;// Writes protected by IUL
+ std::atomic<SerialNum> _flush_serial_num; // Writes protected by SL
+ vespalib::system_time _lastFlushTime; // Protected by SL
+ // Extra frozen memory indexes. This list is empty unless new
+ // memory index has been added by force (due to config change or
+ // data structure limitations).
+ // Protected by SL + IUL
+ FrozenMemoryIndexRefs _frozenMemoryIndexes;
+ /*
+ * Locks protecting state.
+ *
+ * Note about locking:
+ *
+ * A variable can be protected by multiple locks, e.g. SL + NSL.
+ * To change the variable, all of these locks must be held.
+ * To read the variable, holding any of these locks is sufficient.
+ *
+ * In the example above, getting NSL typically has lower latency, but
+ * fewer variables can be retrieved. Getting SL has a higher latency
+ * and allows a snapshot of multiple variables depending on each other
+ * to be retrieved.
+ *
+ * Flush threads typically performs some setup (take SL, copy a
+ * relevant portion of variables to an args class (FlushArgs,
+ * FusionArgs, SetSchemaArgs) and perform some disk io) that takes
+ * time before scheduling a state change task performed by the
+ * document db master thread.
+ *
+ * The scheduled state change task will fail if state changed too much
+ * after setup by the flush thread. The flush thread must then retry
+ * with an updated setup.
+ *
+ * Things get more complicated when handling multiple kinds of overlapping
+ * flush operations, e.g. dump from memory to disk, fusion, schema changes
+ * and pruning of removed fields, since this will trigger more retries for
+ * some of the operations.
+ */
+ std::mutex _state_lock; // Outer lock (SL)
+ mutable std::mutex _index_update_lock; // Inner lock (IUL)
+ mutable std::mutex _new_search_lock; // Inner lock (NSL)
+ std::mutex _remove_lock; // Lock for removing indexes.
+ // Protected by SL + IUL
+ FusionSpec _fusion_spec; // Protected by FL
+ mutable std::mutex _fusion_lock; // Fusion spec lock (FL)
+ uint32_t _maxFlushed;
+ uint32_t _maxFrozen;
+ ChangeGens _changeGens; // Protected by SL + IUL
+ std::mutex _schemaUpdateLock; // Serialize rewrite of schema
+ const search::TuneFileAttributes _tuneFileAttributes;
+ const IndexMaintainerContext _ctx;
+ IIndexMaintainerOperations &_operations;
+
+ search::FixedSourceSelector & getSourceSelector() { return static_cast<search::FixedSourceSelector &>(*_selector); }
+ const search::FixedSourceSelector & getSourceSelector() const { return static_cast<const search::FixedSourceSelector &>(*_selector); }
+ uint32_t getNewAbsoluteId();
+ vespalib::string getFlushDir(uint32_t sourceId) const;
+ vespalib::string getFusionDir(uint32_t sourceId) const;
+
+ /**
+ * Will reopen diskindexes if necessary due to schema changes.
+ * @param coll Indexcollection that will be updated with reloaded index.
+ * @return true if any reload has been performed
+ */
+ bool reopenDiskIndexes(ISearchableIndexCollection &coll);
+
+ void updateDiskIndexSchema(const vespalib::string &indexDir,
+ const Schema &schema,
+ SerialNum serialNum);
+
+ void updateIndexSchemas(IIndexCollection &coll,
+ const Schema &schema,
+ SerialNum serialNum);
+
+ void updateActiveFusionPrunedSchema(const Schema &schema);
+ void deactivateDiskIndexes(vespalib::string indexDir);
+ IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir);
+ IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex);
+
+ IDiskIndex::SP flushMemoryIndex(IMemoryIndex &memoryIndex,
+ uint32_t indexId,
+ uint32_t docIdLimit,
+ SerialNum serialNum,
+ search::FixedSourceSelector::SaveInfo &saveInfo);
+
+ ISearchableIndexCollection::UP loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList);
+ void replaceSource(uint32_t sourceId, const IndexSearchable::SP &source);
+ void appendSource(uint32_t sourceId, const IndexSearchable::SP &source);
+ void swapInNewIndex(LockGuard & guard, ISearchableIndexCollection::SP indexes, IndexSearchable & source);
+ ISearchableIndexCollection::UP createNewSourceCollection(const LockGuard &newSearchLock);
+
+ struct FlushArgs {
+ IMemoryIndex::SP old_index; // Last memory index
+ uint32_t old_absolute_id;
+ ISearchableIndexCollection::SP old_source_list; // Delays destruction
+ search::FixedSourceSelector::SaveInfo::SP save_info;
+ SerialNum flush_serial_num;
+ FlushStats * stats;
+ bool _skippedEmptyLast; // Don't flush empty memory index
+
+ // Extra indexes to flush before flushing last frozen memory index
+ // They are flushed before old_index. This list is empty unless
+ // new memory index has been added by force (due to config change
+ // or data structure limitations).
+ FrozenMemoryIndexRefs _extraIndexes;
+ ChangeGens _changeGens;
+ Schema::SP _prunedSchema;
+
+ FlushArgs();
+ FlushArgs(const FlushArgs &) = delete;
+ FlushArgs & operator=(const FlushArgs &) = delete;
+ FlushArgs(FlushArgs &&);
+ FlushArgs & operator=(FlushArgs &&);
+ ~FlushArgs();
+ };
+
+ bool doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index);
+ void doFlush(FlushArgs args);
+ void flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds);
+ void flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds);
+ void updateFlushStats(const FlushArgs &args);
+ void flushMemoryIndex(FlushArgs &args, uint32_t docIdLimit,
+ search::FixedSourceSelector::SaveInfo &saveInfo, FlushIds &flushIds);
+ void reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex);
+ bool doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index);
+
+
+ class FusionArgs {
+ public:
+ uint32_t _new_fusion_id;
+ ChangeGens _changeGens;
+ Schema _schema;
+ Schema::SP _prunedSchema;
+ ISearchableIndexCollection::SP _old_source_list; // Delays destruction
+
+ FusionArgs();
+ ~FusionArgs();
+ };
+
+ void scheduleFusion(const FlushIds &flushIds);
+ bool canRunFusion(const FusionSpec &spec) const;
+ bool doneFusion(FusionArgs *args, IDiskIndex::SP *new_index);
+
+ class SetSchemaArgs {
+ public:
+ Schema _newSchema;
+ Schema _oldSchema;
+ IMemoryIndex::SP _oldIndex;
+ ISearchableIndexCollection::SP _oldSourceList; // Delays destruction
+
+ SetSchemaArgs();
+ ~SetSchemaArgs();
+ };
+
+ void doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex);
+
+ Schema getSchema(void) const;
+ Schema::SP getActiveFusionPrunedSchema() const;
+ search::TuneFileAttributes getAttrTune();
+ ChangeGens getChangeGens();
+
+ /*
+ * Schedule document db executor task to use reconfigurer to
+ * reconfigure index manager with closure as argument. Wait for
+ * result.
+ */
+ bool reconfigure(std::unique_ptr<Configure> configure);
+ void warmupDone(std::shared_ptr<WarmupIndexCollection> current) override;
+ bool makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCollection> keepAlive);
+ void commit_and_wait();
+ void commit(vespalib::Gate& gate);
+ void pruneRemovedFields(const Schema &schema, SerialNum serialNum);
+ [[nodiscard]] SerialNum current_serial_num() const noexcept {
+ return _current_serial_num.load(std::memory_order_relaxed);
+ }
+ void set_current_serial_num(SerialNum new_serial_num) noexcept {
+ _current_serial_num.store(new_serial_num, std::memory_order_relaxed);
+ }
+ [[nodiscard]] SerialNum flush_serial_num() const noexcept {
+ return _flush_serial_num.load(std::memory_order_relaxed);
+ }
+ void set_flush_serial_num(SerialNum new_serial_num) noexcept {
+ _flush_serial_num.store(new_serial_num, std::memory_order_relaxed);
+ }
+
+public:
+ IndexMaintainer(const IndexMaintainer &) = delete;
+ IndexMaintainer & operator = (const IndexMaintainer &) = delete;
+ IndexMaintainer(const IndexMaintainerConfig &config,
+ const IndexMaintainerContext &context,
+ IIndexMaintainerOperations &operations);
+ ~IndexMaintainer() override;
+
+ /**
+ * Starts a new MemoryIndex, and dumps the previous one to disk.
+ * Updates flush stats when finished if specified.
+ **/
+ FlushTask::UP initFlush(SerialNum serialNum, FlushStats * stats);
+ FusionSpec getFusionSpec();
+
+ /**
+ * Runs fusion for any available specs and return the output fusion directory.
+ */
+ vespalib::string doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token);
+ uint32_t runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token);
+ void removeOldDiskIndexes();
+
+ struct FlushStats {
+ explicit FlushStats(uint64_t memory_before=0) :
+ memory_before_bytes(memory_before),
+ memory_after_bytes(0),
+ disk_write_bytes(0),
+ cpu_time_required(0)
+ { }
+
+ uint64_t memory_before_bytes;
+ uint64_t memory_after_bytes;
+ uint64_t disk_write_bytes;
+ uint64_t cpu_time_required;
+ };
+
+ struct FusionStats {
+ FusionStats()
+ : diskUsage(0),
+ maxFlushed(0),
+ numUnfused(0),
+ _canRunFusion(false)
+ { }
+
+ uint64_t diskUsage;
+ uint32_t maxFlushed;
+ uint32_t numUnfused;
+ bool _canRunFusion;
+ };
+
+ /**
+ * Calculates an approximation of the cost of performing a flush.
+ **/
+ FlushStats getFlushStats() const;
+ FusionStats getFusionStats() const;
+ const vespalib::string & getBaseDir() const { return _base_dir; }
+ uint32_t getNumFrozenMemoryIndexes() const;
+ uint32_t getMaxFrozenMemoryIndexes() const { return _maxFrozen; }
+
+ vespalib::system_time getLastFlushTime() const { return _lastFlushTime; }
+
+ // Implements IIndexManager
+ void putDocument(uint32_t lid, const Document &doc, SerialNum serialNum, OnWriteDoneType on_write_done) override;
+ void removeDocuments(LidVector lids, SerialNum serialNum) override;
+ void commit(SerialNum serialNum, OnWriteDoneType onWriteDone) override;
+ void heartBeat(search::SerialNum serialNum) override;
+ void compactLidSpace(uint32_t lidLimit, SerialNum serialNum) override;
+
+ SerialNum getCurrentSerialNum() const override {
+ return current_serial_num();
+ }
+
+ SerialNum getFlushedSerialNum() const override {
+ return flush_serial_num();
+ }
+
+ IIndexCollection::SP getSourceCollection() const {
+ LockGuard lock(_new_search_lock);
+ return _source_list;
+ }
+
+ searchcorespi::IndexSearchable::SP getSearchable() const override {
+ LockGuard lock(_new_search_lock);
+ return _source_list;
+ }
+
+ search::SearchableStats getSearchableStats() const override {
+ LockGuard lock(_new_search_lock);
+ return _source_list->getSearchableStats();
+ }
+
+ IFlushTarget::List getFlushTargets() override;
+ void setSchema(const Schema & schema, SerialNum serialNum) override ;
+ void setMaxFlushed(uint32_t maxFlushed) override;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp
new file mode 100644
index 00000000000..695de7b84ff
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp
@@ -0,0 +1,27 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "indexmaintainerconfig.h"
+
+using search::index::Schema;
+using search::TuneFileAttributes;
+
+namespace searchcorespi::index {
+
+IndexMaintainerConfig::IndexMaintainerConfig(const vespalib::string &baseDir,
+ const WarmupConfig & warmup,
+ size_t maxFlushed,
+ const Schema &schema,
+ const search::SerialNum serialNum,
+ const TuneFileAttributes &tuneFileAttributes)
+ : _baseDir(baseDir),
+ _warmup(warmup),
+ _maxFlushed(maxFlushed),
+ _schema(schema),
+ _serialNum(serialNum),
+ _tuneFileAttributes(tuneFileAttributes)
+{
+}
+
+IndexMaintainerConfig::~IndexMaintainerConfig() { }
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.h
new file mode 100644
index 00000000000..3f890e6fa76
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.h
@@ -0,0 +1,64 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "warmupconfig.h"
+#include <vespa/searchlib/common/tunefileinfo.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi::index {
+
+/**
+ * Class that keeps the config used when constructing an index maintainer.
+ */
+class IndexMaintainerConfig {
+private:
+ const vespalib::string _baseDir;
+ const WarmupConfig _warmup;
+ const size_t _maxFlushed;
+ const search::index::Schema _schema;
+ const search::SerialNum _serialNum;
+ const search::TuneFileAttributes _tuneFileAttributes;
+
+public:
+ IndexMaintainerConfig(const vespalib::string &baseDir,
+ const WarmupConfig & warmup,
+ size_t maxFlushed,
+ const search::index::Schema &schema,
+ const search::SerialNum serialNum,
+ const search::TuneFileAttributes &tuneFileAttributes);
+
+ ~IndexMaintainerConfig();
+
+ /**
+ * Returns the base directory in which the maintainer will store its indexes.
+ */
+ const vespalib::string &getBaseDir() const {
+ return _baseDir;
+ }
+
+ WarmupConfig getWarmup() const { return _warmup; }
+
+ /**
+ * Returns the initial schema containing all current index fields.
+ */
+ const search::index::Schema &getSchema() const {
+ return _schema;
+ }
+
+ search::SerialNum getSerialNum() const { return _serialNum; }
+
+ /**
+ * Returns the specification on how to read/write attribute vector data files.
+ */
+ const search::TuneFileAttributes &getTuneFileAttributes() const {
+ return _tuneFileAttributes;
+ }
+
+ size_t getMaxFlushed() const {
+ return _maxFlushed;
+ }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.cpp
new file mode 100644
index 00000000000..efd7827fc3d
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.cpp
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "indexmaintainercontext.h"
+
+using search::common::FileHeaderContext;
+using search::TuneFileAttributes;
+using searchcorespi::IIndexManager;
+
+namespace searchcorespi::index {
+
+IndexMaintainerContext::IndexMaintainerContext(IThreadingService &threadingService,
+ IIndexManager::Reconfigurer &reconfigurer,
+ const FileHeaderContext &fileHeaderContext,
+ vespalib::Executor & warmupExecutor)
+ : _threadingService(threadingService),
+ _reconfigurer(reconfigurer),
+ _fileHeaderContext(fileHeaderContext),
+ _warmupExecutor(warmupExecutor)
+{
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.h
new file mode 100644
index 00000000000..2c7aa4af48e
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.h
@@ -0,0 +1,55 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "ithreadingservice.h"
+#include "iindexmanager.h"
+#include <vespa/searchlib/common/tunefileinfo.h>
+#include <vespa/searchlib/common/fileheadercontext.h>
+#include <vespa/vespalib/util/threadexecutor.h>
+
+namespace searchcorespi::index {
+
+/**
+ * Class that keeps the long-lived context used by an index maintainer.
+ */
+class IndexMaintainerContext {
+private:
+ IThreadingService &_threadingService;
+ IIndexManager::Reconfigurer &_reconfigurer;
+ const search::common::FileHeaderContext &_fileHeaderContext;
+ vespalib::Executor & _warmupExecutor;
+
+public:
+ IndexMaintainerContext(IThreadingService &threadingService,
+ IIndexManager::Reconfigurer &reconfigurer,
+ const search::common::FileHeaderContext &fileHeaderContext,
+ vespalib::Executor & warmupExecutor);
+
+ /**
+ * Returns the treading service that encapsulates the thread model used for writing.
+ */
+ IThreadingService &getThreadingService() const {
+ return _threadingService;
+ }
+
+ /**
+ * Returns the reconfigurer used to signal when the index maintainer has changed.
+ */
+ IIndexManager::Reconfigurer &getReconfigurer() const {
+ return _reconfigurer;
+ }
+
+ /**
+ * Returns the context used to insert extra tags into file headers before writing them.
+ */
+ const search::common::FileHeaderContext &getFileHeaderContext() const {
+ return _fileHeaderContext;
+ }
+
+ /**
+ * @return The executor that should be used for warmup.
+ */
+ vespalib::Executor & getWarmupExecutor() const { return _warmupExecutor; }
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.cpp b/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.cpp
new file mode 100644
index 00000000000..8ba9efe2734
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.cpp
@@ -0,0 +1,19 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "indexmanagerconfig.h"
+
+namespace searchcorespi {
+
+IndexManagerConfig::IndexManagerConfig(const vespalib::string &configId,
+ const config::ConfigSnapshot &configSnapshot,
+ size_t numSearcherThreads)
+ : _configId(configId),
+ _configSnapshot(configSnapshot),
+ _numSearcherThreads(numSearcherThreads)
+{
+}
+
+IndexManagerConfig::~IndexManagerConfig() { }
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.h b/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.h
new file mode 100644
index 00000000000..decb03d97e0
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.h
@@ -0,0 +1,42 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/config/retriever/configsnapshot.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+
+/**
+ * Class that keeps the config used when constructing an index manager.
+ */
+class IndexManagerConfig {
+private:
+ vespalib::string _configId;
+ const config::ConfigSnapshot &_configSnapshot;
+ size_t _numSearcherThreads;
+
+public:
+ IndexManagerConfig(const vespalib::string &configId,
+ const config::ConfigSnapshot &configSnapshot,
+ size_t numSearcherThreads);
+ ~IndexManagerConfig();
+
+ /**
+ * Returns the config id used to retrieve the configs from the config snapshot instance.
+ */
+ const vespalib::string &getConfigId() const { return _configId; }
+
+ /**
+ * Returns the snapshot containing configs to be used by the index manager.
+ */
+ const config::ConfigSnapshot &getConfigSnapshot() const { return _configSnapshot; }
+
+ /**
+ * Returns the number of searcher threads that are used to query the index manager.
+ */
+ size_t getNumSearcherThreads() const { return _numSearcherThreads; }
+};
+
+} // namespace searchcorespi
+
+
diff --git a/searchcore/src/vespa/searchcorespi/index/indexreadutilities.cpp b/searchcore/src/vespa/searchcorespi/index/indexreadutilities.cpp
new file mode 100644
index 00000000000..14556ddef29
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexreadutilities.cpp
@@ -0,0 +1,88 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "indexreadutilities.h"
+#include "indexdisklayout.h"
+#include <vespa/fastlib/io/bufferedfile.h>
+#include <vespa/vespalib/data/fileheader.h>
+#include <set>
+#include <vector>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexreadutilities");
+
+using search::SerialNum;
+using vespalib::FileHeader;
+
+namespace searchcorespi::index {
+
+namespace {
+
+/**
+ * Assumes that cleanup has removed all obsolete index dirs.
+ **/
+void
+scanForIndexes(const vespalib::string &baseDir,
+ std::vector<vespalib::string> &flushDirs,
+ vespalib::string &fusionDir)
+{
+ FastOS_DirectoryScan dirScan(baseDir.c_str());
+ while (dirScan.ReadNext()) {
+ if (!dirScan.IsDirectory()) {
+ continue;
+ }
+ vespalib::string name = dirScan.GetName();
+ if (name.find(IndexDiskLayout::FlushDirPrefix) == 0) {
+ flushDirs.push_back(name);
+ }
+ if (name.find(IndexDiskLayout::FusionDirPrefix) == 0) {
+ if (!fusionDir.empty()) {
+ // Should never happen, since we run cleanup before load.
+ LOG(warning, "Base directory '%s' contains multiple fusion indexes",
+ baseDir.c_str());
+ }
+ fusionDir = name;
+ }
+ }
+}
+
+}
+
+FusionSpec
+IndexReadUtilities::readFusionSpec(const vespalib::string &baseDir)
+{
+ std::vector<vespalib::string> flushDirs;
+ vespalib::string fusionDir;
+ scanForIndexes(baseDir, flushDirs, fusionDir);
+
+ uint32_t fusionId = 0;
+ if (!fusionDir.empty()) {
+ fusionId = atoi(fusionDir.substr(IndexDiskLayout::FusionDirPrefix.size()).c_str());
+ }
+ std::set<uint32_t> flushIds;
+ for (size_t i = 0; i < flushDirs.size(); ++i) {
+ uint32_t id = atoi(flushDirs[i].substr(IndexDiskLayout::FlushDirPrefix.size()).c_str());
+ flushIds.insert(id);
+ }
+
+ FusionSpec fusionSpec;
+ fusionSpec.last_fusion_id = fusionId;
+ fusionSpec.flush_ids.assign(flushIds.begin(), flushIds.end());
+ return fusionSpec;
+}
+
+SerialNum
+IndexReadUtilities::readSerialNum(const vespalib::string &dir)
+{
+ const vespalib::string fileName = IndexDiskLayout::getSerialNumFileName(dir);
+ Fast_BufferedFile file;
+ file.ReadOpen(fileName.c_str());
+
+ FileHeader fileHeader;
+ fileHeader.readFile(file);
+ if (fileHeader.hasTag(IndexDiskLayout::SerialNumTag)) {
+ return fileHeader.getTag(IndexDiskLayout::SerialNumTag).asInteger();
+ }
+ return 0;
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexreadutilities.h b/searchcore/src/vespa/searchcorespi/index/indexreadutilities.h
new file mode 100644
index 00000000000..aeafd746772
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexreadutilities.h
@@ -0,0 +1,23 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "fusionspec.h"
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Utility class with functions to read aspects of an index from disk.
+ * Used by the index maintainer.
+ */
+struct IndexReadUtilities {
+ static FusionSpec readFusionSpec(const vespalib::string &baseDir);
+ static search::SerialNum readSerialNum(const vespalib::string &dir);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcore/src/vespa/searchcorespi/index/indexsearchable.h b/searchcore/src/vespa/searchcorespi/index/indexsearchable.h
new file mode 100644
index 00000000000..609d7854351
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexsearchable.h
@@ -0,0 +1,59 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchcommon/attribute/iattributecontext.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/searchlib/index/i_field_length_inspector.h>
+#include <vespa/searchlib/query/tree/node.h>
+#include <vespa/searchlib/queryeval/blueprint.h>
+#include <vespa/searchlib/queryeval/field_spec.h>
+#include <vespa/searchlib/queryeval/irequestcontext.h>
+#include <vespa/searchlib/queryeval/searchable.h>
+#include <vespa/searchlib/util/searchable_stats.h>
+
+namespace searchcorespi {
+
+class IndexSearchableVisitor;
+
+/**
+ * Abstract class extended by components to expose content that can be
+ * searched by a query term. A IndexSearchable component supports searching
+ * in one or more named fields. The Blueprint created by a Searchable
+ * is an intermediate query representation that is later used to
+ * create the actual search iterators used to produce matches.
+ *
+ * The class is a specialized version of search::queryeval::Searchable
+ * that let the components access a per query attribute context that expose
+ * attribute vectors that can be utilized during query evaluation.
+ **/
+class IndexSearchable : public search::queryeval::Searchable,
+ public search::index::IFieldLengthInspector {
+protected:
+ using IRequestContext = search::queryeval::IRequestContext;
+ using FieldSpec = search::queryeval::FieldSpec;
+ using FieldSpecList = search::queryeval::FieldSpecList;
+ using Node = search::query::Node;
+ using IAttributeContext = search::attribute::IAttributeContext;
+ using Blueprint = search::queryeval::Blueprint;
+public:
+ typedef std::shared_ptr<IndexSearchable> SP;
+
+ /**
+ * Returns the searchable stats for this index searchable.
+ */
+ virtual search::SearchableStats getSearchableStats() const = 0;
+
+ /**
+ * Returns the serial number for this index searchable.
+ */
+ virtual search::SerialNum getSerialNum() const = 0;
+
+ /**
+ * Calls visitor with properly downcasted argument to differentiate
+ * between different types of indexes (disk index or memory index).
+ */
+ virtual void accept(IndexSearchableVisitor &visitor) const = 0;
+};
+
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/indexsearchablevisitor.h b/searchcore/src/vespa/searchcorespi/index/indexsearchablevisitor.h
new file mode 100644
index 00000000000..f85a2cf4af6
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexsearchablevisitor.h
@@ -0,0 +1,26 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+namespace searchcorespi {
+
+namespace index {
+
+struct IDiskIndex;
+struct IMemoryIndex;
+
+}
+
+/*
+ * Interface for visiting an index searchable containing disk and
+ * memory indexes.
+ */
+class IndexSearchableVisitor
+{
+public:
+ virtual ~IndexSearchableVisitor() { }
+ virtual void visit(const index::IDiskIndex &index) = 0;
+ virtual void visit(const index::IMemoryIndex &index) = 0;
+};
+
+} // namespace searchcorespi
diff --git a/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.cpp b/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.cpp
new file mode 100644
index 00000000000..cc2575f74d2
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.cpp
@@ -0,0 +1,194 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "indexwriteutilities.h"
+#include "indexdisklayout.h"
+#include "indexreadutilities.h"
+#include <vespa/searchlib/common/serialnumfileheadercontext.h>
+#include <vespa/searchlib/index/schemautil.h>
+#include <vespa/fastlib/io/bufferedfile.h>
+#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/util/exceptions.h>
+#include <sstream>
+#include <unistd.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexwriteutilities");
+
+
+using search::FixedSourceSelector;
+using search::TuneFileAttributes;
+using search::common::FileHeaderContext;
+using search::common::SerialNumFileHeaderContext;
+using search::index::Schema;
+using search::index::SchemaUtil;
+using search::SerialNum;
+using vespalib::IllegalStateException;
+using vespalib::FileHeader;
+
+namespace searchcorespi::index {
+
+namespace {
+
+SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max();
+
+}
+
+void
+IndexWriteUtilities::writeSerialNum(SerialNum serialNum,
+ const vespalib::string &dir,
+ const FileHeaderContext &fileHeaderContext)
+{
+ const vespalib::string fileName =
+ IndexDiskLayout::getSerialNumFileName(dir);
+ const vespalib::string tmpFileName = fileName + ".tmp";
+
+ SerialNumFileHeaderContext snFileHeaderContext(fileHeaderContext, serialNum);
+ Fast_BufferedFile file;
+ file.WriteOpen(tmpFileName.c_str());
+ FileHeader fileHeader;
+ snFileHeaderContext.addTags(fileHeader, fileName);
+ fileHeader.putTag(FileHeader::Tag(IndexDiskLayout::SerialNumTag, serialNum));
+ bool ok = (fileHeader.writeFile(file) >= fileHeader.getSize());
+ if ( ! ok) {
+ LOG(error, "Unable to write file header '%s'", tmpFileName.c_str());
+ }
+ if ( ! file.Sync()) {
+ ok = false;
+ LOG(error, "Unable to fsync '%s'", tmpFileName.c_str());
+ }
+ if ( ! file.Close()) {
+ ok = false;
+ LOG(error, "Unable to close '%s'", tmpFileName.c_str());
+ }
+ vespalib::File::sync(dir);
+
+ if (ok) {
+ FastOS_File renameFile(tmpFileName.c_str());
+ ok &= renameFile.Rename(fileName.c_str());
+ }
+ if (!ok) {
+ std::ostringstream msg;
+ msg << "Unable to write serial number to '" << dir << "'.";
+ throw IllegalStateException(msg.str());
+ }
+ vespalib::File::sync(dir);
+}
+
+bool
+IndexWriteUtilities::copySerialNumFile(const vespalib::string &sourceDir,
+ const vespalib::string &destDir)
+{
+ vespalib::string source = IndexDiskLayout::getSerialNumFileName(sourceDir);
+ vespalib::string dest = IndexDiskLayout::getSerialNumFileName(destDir);
+ vespalib::string tmpDest = dest + ".tmp";
+ if (!FastOS_FileInterface::CopyFile(source.c_str(), tmpDest.c_str())) {
+ LOG(error, "Unable to copy file '%s'", source.c_str());
+ return false;
+ }
+ FastOS_File file(tmpDest.c_str());
+ if (!file.OpenReadWrite()) {
+ LOG(error, "Unable to open '%s' for fsync", tmpDest.c_str());
+ return false;
+ }
+ if (!file.Sync()) {
+ LOG(error, "Unable to fsync '%s'", tmpDest.c_str());
+ return false;
+ }
+ if (!file.Close()) {
+ LOG(error, "Unable to close '%s'", tmpDest.c_str());
+ return false;
+ }
+ vespalib::File::sync(destDir);
+ if (!file.Rename(dest.c_str())) {
+ LOG(error, "Unable to rename file '%s' to '%s'", tmpDest.c_str(), dest.c_str());
+ return false;
+ }
+ vespalib::File::sync(destDir);
+ return true;
+}
+
+void
+IndexWriteUtilities::writeSourceSelector(FixedSourceSelector::SaveInfo &
+ saveInfo,
+ uint32_t sourceId,
+ const TuneFileAttributes &
+ tuneFileAttributes,
+ const FileHeaderContext &
+ fileHeaderContext,
+ SerialNum serialNum)
+{
+ SerialNumFileHeaderContext snFileHeaderContext(fileHeaderContext,
+ serialNum);
+ if (!saveInfo.save(tuneFileAttributes, snFileHeaderContext)) {
+ std::ostringstream msg;
+ msg << "Flush of sourceselector failed. Source id = " << sourceId;
+ throw IllegalStateException(msg.str());
+ }
+}
+
+void
+IndexWriteUtilities::updateDiskIndexSchema(const vespalib::string &indexDir,
+ const Schema &schema,
+ SerialNum serialNum)
+{
+ vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir);
+ Schema oldSchema;
+ if (!oldSchema.loadFromFile(schemaName)) {
+ LOG(error, "Could not open schema '%s'",
+ schemaName.c_str());
+ return;
+ }
+ if (!SchemaUtil::validateSchema(oldSchema)) {
+ LOG(error, "Could not validate schema loaded from '%s'",
+ schemaName.c_str());
+ return;
+ }
+ Schema::UP newSchema = Schema::intersect(oldSchema, schema);
+ if (*newSchema == oldSchema) {
+ return;
+ }
+ if (serialNum != noSerialNumHigh) {
+ SerialNum oldSerial = IndexReadUtilities::readSerialNum(indexDir);
+ if (oldSerial >= serialNum) {
+ return;
+ }
+ }
+ vespalib::string schemaTmpName = schemaName + ".tmp";
+ vespalib::string schemaOrigName = schemaName + ".orig";
+ vespalib::unlink(schemaTmpName);
+ if (!newSchema->saveToFile(schemaTmpName)) {
+ LOG(error, "Could not save schema to '%s'",
+ schemaTmpName.c_str());
+ }
+ // XXX: FastOS layer violation
+ FastOS_StatInfo statInfo;
+ bool statres;
+ statres = FastOS_File::Stat(schemaOrigName.c_str(), &statInfo);
+ if (!statres) {
+ if (statInfo._error != FastOS_StatInfo::FileNotFound) {
+ LOG(error, "Failed to stat orig schema '%s': %s",
+ schemaOrigName.c_str(),
+ FastOS_File::getLastErrorString().c_str());
+ }
+ int linkres = ::link(schemaName.c_str(), schemaOrigName.c_str());
+ if (linkres != 0) {
+ LOG(error, "Could not link '%s' to '%s': %s",
+ schemaOrigName.c_str(),
+ schemaName.c_str(),
+ FastOS_File::getLastErrorString().c_str());
+ }
+ vespalib::File::sync(indexDir);
+ }
+ // XXX: FastOS layer violation
+ int renameres = ::rename(schemaTmpName.c_str(), schemaName.c_str());
+ if (renameres != 0) {
+ int error = errno;
+ std::string errString = FastOS_File::getErrorString(error);
+ LOG(error, "Could not rename '%s' to '%s': %s",
+ schemaTmpName.c_str(),
+ schemaName.c_str(),
+ errString.c_str());
+ }
+ vespalib::File::sync(indexDir);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.h b/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.h
new file mode 100644
index 00000000000..313ab3cc1c7
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.h
@@ -0,0 +1,44 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchlib/attribute/fixedsourceselector.h>
+#include <vespa/searchlib/common/tunefileinfo.h>
+#include <vespa/searchlib/common/fileheadercontext.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi::index {
+
+/**
+ * Utility class with functions to write aspects of an index to disk.
+ * Used by the index maintainer.
+ */
+struct IndexWriteUtilities
+{
+ static void
+ writeSerialNum(search::SerialNum serialNum,
+ const vespalib::string &dir,
+ const search::common::FileHeaderContext &fileHeaderContext);
+
+ static bool
+ copySerialNumFile(const vespalib::string &sourceDir,
+ const vespalib::string &destDir);
+
+ static void
+ writeSourceSelector(search::FixedSourceSelector::SaveInfo &saveInfo,
+ uint32_t sourceId,
+ const search::TuneFileAttributes &tuneFileAttributes,
+ const search::common::FileHeaderContext &
+ fileHeaderContext,
+ search::SerialNum serialNum);
+
+ static void
+ updateDiskIndexSchema(const vespalib::string &indexDir,
+ const search::index::Schema &schema,
+ search::SerialNum serialNum);
+};
+
+}
+
+
diff --git a/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.cpp b/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.cpp
new file mode 100644
index 00000000000..85e87965cb7
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.cpp
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "isearchableindexcollection.h"
+#include <vespa/searchlib/queryeval/isourceselector.h>
+
+namespace searchcorespi {
+
+using search::queryeval::ISourceSelector;
+
+void
+ISearchableIndexCollection::setCurrentIndex(uint32_t id)
+{
+ assert( id < ISourceSelector::SOURCE_LIMIT);
+
+ _currentIndex = id;
+}
+
+uint32_t
+ISearchableIndexCollection::getCurrentIndex() const
+{
+ assert( valid() );
+
+ return _currentIndex;
+}
+
+bool
+ISearchableIndexCollection::valid() const
+{
+ return (_currentIndex > 0) && (_currentIndex < ISourceSelector::SOURCE_LIMIT);
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.h b/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.h
new file mode 100644
index 00000000000..efd7062af71
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.h
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "iindexcollection.h"
+#include "indexsearchable.h"
+
+namespace searchcorespi {
+
+/**
+ * Interface to both an IndexCollection and to an IndexSearchable
+ */
+class ISearchableIndexCollection : public IIndexCollection,
+ public IndexSearchable {
+public:
+ ISearchableIndexCollection() : _currentIndex(-1) { }
+ using UP = std::unique_ptr<ISearchableIndexCollection>;
+ using SP = std::shared_ptr<ISearchableIndexCollection>;
+
+ virtual void append(uint32_t id, const IndexSearchable::SP &source) = 0;
+ virtual void replace(uint32_t id, const IndexSearchable::SP &source) = 0;
+ virtual IndexSearchable::SP getSearchableSP(uint32_t i) const = 0;
+ virtual void setSource(uint32_t docId) = 0;
+
+ void setCurrentIndex(uint32_t id);
+ uint32_t getCurrentIndex() const;
+ bool valid() const;
+
+private:
+ int32_t _currentIndex;
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h
new file mode 100644
index 00000000000..c325d5ded11
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h
@@ -0,0 +1,88 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "i_thread_service.h"
+
+class FNET_Transport;
+
+namespace vespalib {
+ class ISequencedTaskExecutor;
+ class Clock;
+}
+namespace searchcorespi::index {
+
+/**
+ * Interface for the thread model used for write tasks for a single document database.
+ *
+ * We have multiple write threads:
+ *
+ * 1. The "master" write thread used for the majority of write tasks.
+ *
+ * 2. The "index" write thread used for doing changes to the memory
+ * index, either directly (for data not bound to a field) or via
+ * index field inverter executor or index field writer executor.
+ *
+ * 3. The "summary" thread is used for doing changes to the document store.
+ *
+ * 4. The "index field inverter" executor is used to populate field
+ * inverters with data from document fields. Scheduled tasks for
+ * the same field are executed in sequence.
+ *
+ * 5. The "index field writer" executor is used to sort data in field
+ * inverters before pushing the data to the memory field indexes.
+ * Scheduled tasks for the same field are executed in sequence.
+ *
+ * 6. The "attribute field writer" executor is used to write data to attribute vectors.
+ * Each attribute is always handled by the same thread,
+ * and scheduled tasks for the same attribute are executed in sequence.
+ *
+ * The master write thread is always the one giving tasks to the other write threads above.
+ *
+ * In addition this interface exposes the "shared" executor that is used by all document databases.
+ * This is among others used for compressing / de-compressing documents in the document store,
+ * merging files as part of disk index fusion, and running the prepare step when doing two-phase
+ * puts against a tensor attribute with a HNSW index.
+ *
+ * The index write thread extracts fields from documents and gives
+ * task to the index field inverter executor and the index field
+ * writer executor.
+ *
+ * The index field inverter executor and index field writer executor
+ * are separate to allow for double buffering, i.e. populate one set
+ * of field inverters using the index field inverter executor while
+ * another set of field inverters are handled by the index field
+ * writer executor.
+ *
+ * We might decide to allow index field inverter tasks to schedule
+ * tasks to the index field writer executor, so draining logic needs
+ * to sync index field inverter executor before syncing index field
+ * writer executor.
+ *
+ * TODO: * indexFieldInverter and indexFieldWriter can be collapsed to one. Both need sequencing,
+ * but they sequence on different things so efficiency will be the same and just depends on #threads
+ */
+struct IThreadingService
+{
+ IThreadingService(const IThreadingService &) = delete;
+ IThreadingService & operator = (const IThreadingService &) = delete;
+ IThreadingService() = default;
+ virtual ~IThreadingService() = default;
+
+ /**
+ * Block the calling thread until the master thread has capacity to handle more tasks,
+ * and then execute the given task in the master thread.
+ */
+ virtual void blocking_master_execute(vespalib::Executor::Task::UP task) = 0;
+
+ virtual ISyncableThreadService &master() = 0;
+ virtual IThreadService &index() = 0;
+ virtual vespalib::ThreadExecutor &summary() = 0;
+ virtual vespalib::Executor &shared() = 0;
+ virtual FNET_Transport &transport() = 0;
+ virtual const vespalib::Clock &clock() const = 0;
+ virtual vespalib::ISequencedTaskExecutor &indexFieldInverter() = 0;
+ virtual vespalib::ISequencedTaskExecutor &indexFieldWriter() = 0;
+ virtual vespalib::ISequencedTaskExecutor &attributeFieldWriter() = 0;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/memory_index_stats.h b/searchcore/src/vespa/searchcorespi/index/memory_index_stats.h
new file mode 100644
index 00000000000..ccc85ab4dc6
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/memory_index_stats.h
@@ -0,0 +1,10 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "index_searchable_stats.h"
+
+namespace searchcorespi::index {
+
+using MemoryIndexStats = IndexSearchableStats;
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/warmupconfig.h b/searchcore/src/vespa/searchcorespi/index/warmupconfig.h
new file mode 100644
index 00000000000..8582b7256bc
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/warmupconfig.h
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/time.h>
+
+namespace searchcorespi::index {
+
+/**
+ * Keeps all config for controlling warmup.
+ **/
+class WarmupConfig {
+public:
+ WarmupConfig() : _duration(vespalib::duration::zero()), _unpack(false) { }
+ WarmupConfig(vespalib::duration duration, bool unpack) : _duration(duration), _unpack(unpack) { }
+ vespalib::duration getDuration() const { return _duration; }
+ bool getUnpack() const { return _unpack; }
+private:
+ const vespalib::duration _duration;
+ const bool _unpack;
+};
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.cpp
new file mode 100644
index 00000000000..bf437dd7ee3
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.cpp
@@ -0,0 +1,269 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "warmupindexcollection.h"
+#include "idiskindex.h"
+#include <vespa/searchlib/fef/matchdatalayout.h>
+#include <vespa/searchlib/query/tree/termnodes.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/stllike/hash_set.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/eval/eval/value.h>
+#include <thread>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.warmupindexcollection");
+
+namespace searchcorespi {
+
+using index::IDiskIndex;
+using search::fef::MatchDataLayout;
+using search::index::FieldLengthInfo;
+using search::query::StringBase;
+using search::queryeval::Blueprint;
+using search::queryeval::ISourceSelector;
+using search::queryeval::SearchIterator;
+using TermMap = vespalib::hash_set<vespalib::string>;
+
+class FieldTermMap : public vespalib::hash_map<uint32_t, TermMap>
+{
+
+};
+
+WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig,
+ ISearchableIndexCollection::SP prev,
+ ISearchableIndexCollection::SP next,
+ IndexSearchable & warmup,
+ vespalib::Executor & executor,
+ const vespalib::Clock & clock,
+ IWarmupDone & warmupDone) :
+ _warmupConfig(warmupConfig),
+ _prev(std::move(prev)),
+ _next(std::move(next)),
+ _warmup(warmup),
+ _executor(executor),
+ _clock(clock),
+ _warmupDone(warmupDone),
+ _warmupEndTime(vespalib::steady_clock::now() + warmupConfig.getDuration()),
+ _handledTerms(std::make_unique<FieldTermMap>()),
+ _pendingTasks()
+{
+ if (_next->valid()) {
+ setCurrentIndex(_next->getCurrentIndex());
+ } else {
+ LOG(warning, "Next index is not valid, Dangerous !! : %s", _next->toString().c_str());
+ }
+ LOG(debug, "For %g seconds I will warm up '%s' %s unpack.", vespalib::to_s(warmupConfig.getDuration()), typeid(_warmup).name(), warmupConfig.getUnpack() ? "with" : "without");
+ LOG(debug, "%s", toString().c_str());
+}
+
+void
+WarmupIndexCollection::setSource(uint32_t docId)
+{
+ assert(_prev->valid());
+ assert(_next->valid());
+ _prev->setSource(docId);
+ _next->setSource(docId);
+}
+
+vespalib::string
+WarmupIndexCollection::toString() const
+{
+ vespalib::asciistream os;
+ os << "warmup : ";
+ if (dynamic_cast<const IDiskIndex *>(&_warmup) != nullptr) {
+ os << static_cast<const IDiskIndex &>(_warmup).getIndexDir();
+ } else {
+ os << typeid(_warmup).name();
+ }
+ os << "\n";
+ os << "next : " << _next->toString() << "\n";
+ os << "prev : " << _prev->toString() << "\n";
+ return os.str();
+}
+
+WarmupIndexCollection::~WarmupIndexCollection()
+{
+ if (_warmupEndTime != vespalib::steady_time()) {
+ LOG(info, "Warmup aborted due to new state change or application shutdown");
+ }
+ assert(_pendingTasks.has_zero_ref_count());
+}
+
+const ISourceSelector &
+WarmupIndexCollection::getSourceSelector() const
+{
+ return _next->getSourceSelector();
+}
+
+size_t
+WarmupIndexCollection::getSourceCount() const
+{
+ return _next->getSourceCount();
+}
+
+IndexSearchable &
+WarmupIndexCollection::getSearchable(uint32_t i) const
+{
+ return _next->getSearchable(i);
+}
+
+uint32_t
+WarmupIndexCollection::getSourceId(uint32_t i) const
+{
+ return _next->getSourceId(i);
+}
+
+void
+WarmupIndexCollection::fireWarmup(Task::UP task)
+{
+ vespalib::steady_time now(vespalib::steady_clock::now());
+ if (now < _warmupEndTime) {
+ _executor.execute(std::move(task));
+ } else {
+ std::unique_lock<std::mutex> guard(_lock);
+ if (_warmupEndTime != vespalib::steady_time()) {
+ _warmupEndTime = vespalib::steady_time();
+ guard.unlock();
+ LOG(info, "Done warming up. Posting WarmupDoneTask");
+ _warmupDone.warmupDone(shared_from_this());
+ }
+ }
+}
+
+bool
+WarmupIndexCollection::handledBefore(uint32_t fieldId, const Node &term)
+{
+ const StringBase * sb(dynamic_cast<const StringBase *>(&term));
+ if (sb != nullptr) {
+ const vespalib::string & s = sb->getTerm();
+ std::lock_guard<std::mutex> guard(_lock);
+ TermMap::insert_result found = (*_handledTerms)[fieldId].insert(s);
+ return ! found.second;
+ }
+ return true;
+}
+Blueprint::UP
+WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term)
+{
+ FieldSpecList fsl;
+ fsl.add(field);
+ return createBlueprint(requestContext, fsl,term);
+}
+
+Blueprint::UP
+WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term)
+{
+ if ( _warmupEndTime == vespalib::steady_time()) {
+ // warmup done
+ return _next->createBlueprint(requestContext, fields, term);
+ }
+ MatchDataLayout mdl;
+ FieldSpecList fsl;
+ bool needWarmUp(false);
+ for(size_t i(0); i < fields.size(); i++) {
+ const FieldSpec & f(fields[i]);
+ FieldSpec fs(f.getName(), f.getFieldId(), mdl.allocTermField(f.getFieldId()), f.isFilter());
+ fsl.add(fs);
+ needWarmUp = needWarmUp || ! handledBefore(fs.getFieldId(), term);
+ }
+ if (needWarmUp) {
+ auto task = std::make_unique<WarmupTask>(mdl.createMatchData(), shared_from_this());
+ task->createBlueprint(fsl, term);
+ fireWarmup(std::move(task));
+ }
+ return _prev->createBlueprint(requestContext, fields, term);
+}
+
+search::SearchableStats
+WarmupIndexCollection::getSearchableStats() const
+{
+ return _prev->getSearchableStats();
+}
+
+
+search::SerialNum
+WarmupIndexCollection::getSerialNum() const
+{
+ return std::max(_prev->getSerialNum(), _next->getSerialNum());
+}
+
+
+void
+WarmupIndexCollection::accept(IndexSearchableVisitor &visitor) const
+{
+ _prev->accept(visitor);
+ _next->accept(visitor);
+}
+
+FieldLengthInfo
+WarmupIndexCollection::get_field_length_info(const vespalib::string& field_name) const
+{
+ return _next->get_field_length_info(field_name);
+}
+
+void
+WarmupIndexCollection::append(uint32_t id, const IndexSearchable::SP &source)
+{
+ _next->append(id, source);
+}
+
+void
+WarmupIndexCollection::replace(uint32_t id, const IndexSearchable::SP &source)
+{
+ _next->replace(id, source);
+}
+
+IndexSearchable::SP
+WarmupIndexCollection::getSearchableSP(uint32_t i) const
+{
+ return _next->getSearchableSP(i);
+}
+
+void
+WarmupIndexCollection::drainPending() {
+ _pendingTasks.waitForZeroRefCount();
+}
+
+WarmupIndexCollection::WarmupRequestContext::WarmupRequestContext(const vespalib::Clock & clock)
+ : _doom(clock, vespalib::steady_time::max(), vespalib::steady_time::max(), false)
+{}
+WarmupIndexCollection::WarmupRequestContext::~WarmupRequestContext() = default;
+
+std::unique_ptr<vespalib::eval::Value>
+WarmupIndexCollection::WarmupRequestContext::get_query_tensor(const vespalib::string&) const {
+ return {};
+}
+WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup)
+ : _warmup(std::move(warmup)),
+ _retainGuard(_warmup->_pendingTasks),
+ _matchData(std::move(md)),
+ _bluePrint(),
+ _requestContext(_warmup->_clock)
+{
+}
+
+WarmupIndexCollection::WarmupTask::~WarmupTask() = default;
+
+void
+WarmupIndexCollection::WarmupTask::run()
+{
+ if (_warmup->_warmupEndTime != vespalib::steady_time()) {
+ LOG(debug, "Warming up %s", _bluePrint->asString().c_str());
+ _bluePrint->fetchPostings(search::queryeval::ExecuteInfo::TRUE);
+ SearchIterator::UP it(_bluePrint->createSearch(*_matchData, true));
+ it->initFullRange();
+ for (uint32_t docId = it->seekFirst(1); !it->isAtEnd(); docId = it->seekNext(docId+1)) {
+ if (_warmup->doUnpack()) {
+ it->unpack(docId);
+ }
+ }
+ } else {
+ LOG(debug, "Warmup has finished, ignoring task.");
+ }
+}
+
+}
diff --git a/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.h
new file mode 100644
index 00000000000..c2d70b4fd5c
--- /dev/null
+++ b/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.h
@@ -0,0 +1,128 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "isearchableindexcollection.h"
+#include "warmupconfig.h"
+#include <vespa/searchlib/attribute/attribute_blueprint_params.h>
+#include <vespa/vespalib/util/doom.h>
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/monitored_refcount.h>
+#include <vespa/vespalib/util/retain_guard.h>
+
+namespace searchcorespi {
+
+class FieldTermMap;
+class WarmupIndexCollection;
+
+class IWarmupDone {
+public:
+ virtual ~IWarmupDone() { }
+ virtual void warmupDone(std::shared_ptr<WarmupIndexCollection> current) = 0;
+};
+/**
+ * Index collection that holds a reference to the active one and a new one that
+ * is to be warmed up.
+ */
+class WarmupIndexCollection : public ISearchableIndexCollection,
+ public std::enable_shared_from_this<WarmupIndexCollection>
+{
+ using WarmupConfig = index::WarmupConfig;
+public:
+ typedef std::shared_ptr<WarmupIndexCollection> SP;
+ WarmupIndexCollection(const WarmupConfig & warmupConfig,
+ ISearchableIndexCollection::SP prev,
+ ISearchableIndexCollection::SP next,
+ IndexSearchable & warmup,
+ vespalib::Executor & executor,
+ const vespalib::Clock & clock,
+ IWarmupDone & warmupDone);
+ ~WarmupIndexCollection() override;
+ // Implements IIndexCollection
+ const ISourceSelector &getSourceSelector() const override;
+ size_t getSourceCount() const override;
+ IndexSearchable &getSearchable(uint32_t i) const override;
+ uint32_t getSourceId(uint32_t i) const override;
+
+ // Implements IndexSearchable
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term) override;
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term) override;
+ search::SearchableStats getSearchableStats() const override;
+ search::SerialNum getSerialNum() const override;
+ void accept(IndexSearchableVisitor &visitor) const override;
+
+ // Implements IFieldLengthInspector
+ search::index::FieldLengthInfo get_field_length_info(const vespalib::string& field_name) const override;
+
+ // Implements ISearchableIndexCollection
+ void append(uint32_t id, const IndexSearchable::SP &source) override;
+ void replace(uint32_t id, const IndexSearchable::SP &source) override;
+ IndexSearchable::SP getSearchableSP(uint32_t i) const override;
+ void setSource(uint32_t docId) override;
+
+ const ISearchableIndexCollection::SP & getNextIndexCollection() const { return _next; }
+ vespalib::string toString() const override;
+ bool doUnpack() const { return _warmupConfig.getUnpack(); }
+ void drainPending();
+private:
+ typedef search::fef::MatchData MatchData;
+ typedef vespalib::Executor::Task Task;
+ class WarmupRequestContext : public IRequestContext {
+ using IAttributeVector = search::attribute::IAttributeVector;
+ using AttributeBlueprintParams = search::attribute::AttributeBlueprintParams;
+ public:
+ WarmupRequestContext(const vespalib::Clock & clock);
+ ~WarmupRequestContext() override;
+ const vespalib::Doom & getDoom() const override { return _doom; }
+ const IAttributeVector *getAttribute(const vespalib::string &) const override { return nullptr; }
+ const IAttributeVector *getAttributeStableEnum(const vespalib::string &) const override { return nullptr; }
+ std::unique_ptr<vespalib::eval::Value> get_query_tensor(const vespalib::string&) const override;
+ const AttributeBlueprintParams& get_attribute_blueprint_params() const override { return _params; }
+ private:
+ const vespalib::Doom _doom;
+ const AttributeBlueprintParams _params;
+ };
+ class WarmupTask : public Task {
+ public:
+ WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup);
+ ~WarmupTask() override;
+ WarmupTask &createBlueprint(const FieldSpec &field, const Node &term) {
+ _bluePrint = _warmup->createBlueprint(_requestContext, field, term);
+ return *this;
+ }
+ WarmupTask &createBlueprint(const FieldSpecList &fields, const Node &term) {
+ _bluePrint = _warmup->createBlueprint(_requestContext, fields, term);
+ return *this;
+ }
+ private:
+ void run() override;
+ std::shared_ptr<WarmupIndexCollection> _warmup;
+ vespalib::RetainGuard _retainGuard;
+ std::unique_ptr<MatchData> _matchData;
+ Blueprint::UP _bluePrint;
+ WarmupRequestContext _requestContext;
+ };
+
+ void fireWarmup(Task::UP task);
+ bool handledBefore(uint32_t fieldId, const Node &term);
+
+ const WarmupConfig _warmupConfig;
+ ISearchableIndexCollection::SP _prev;
+ ISearchableIndexCollection::SP _next;
+ IndexSearchable & _warmup;
+ vespalib::Executor & _executor;
+ const vespalib::Clock & _clock;
+ IWarmupDone & _warmupDone;
+ vespalib::steady_time _warmupEndTime;
+ std::mutex _lock;
+ std::unique_ptr<FieldTermMap> _handledTerms;
+ vespalib::MonitoredRefCount _pendingTasks;
+};
+
+} // namespace searchcorespi