diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-05-15 11:50:08 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-05-15 11:50:46 +0000 |
commit | 81fb601117642a0e531630cc504cb4a6855d27df (patch) | |
tree | 093d429e9e791df3870aa84a3601489c2f214351 /searchcore | |
parent | fb35d47f9dfdc0cab773b86a496e010877d72afe (diff) |
Collapse searchcorespi into searchcore
Diffstat (limited to 'searchcore')
87 files changed, 5905 insertions, 10 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index 7fb98cb2514..8df693e8ea2 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -15,7 +15,6 @@ vespa_define_module( messagebus documentapi persistence - searchcorespi searchsummary fileacquirer @@ -41,6 +40,9 @@ vespa_define_module( src/vespa/searchcore/proton/server src/vespa/searchcore/proton/summaryengine src/vespa/searchcore/proton/test + src/vespa/searchcorespi + src/vespa/searchcorespi/flush + src/vespa/searchcorespi/index APPS src/apps/proton @@ -158,6 +160,8 @@ vespa_define_module( src/tests/proton/statusreport src/tests/proton/summaryengine src/tests/proton/verify_ranksetup + src/tests/index/disk_indexes + src/tests/index/index_disk_layout TEST_DEPENDS messagebus_messagebus-test diff --git a/searchcore/src/tests/index/disk_indexes/CMakeLists.txt b/searchcore/src/tests/index/disk_indexes/CMakeLists.txt new file mode 100644 index 00000000000..81b6bb0e8a9 --- /dev/null +++ b/searchcore/src/tests/index/disk_indexes/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcorespi_disk_indexes_test_app + SOURCES + disk_indexes_test.cpp + DEPENDS + searchcorespi + GTest::GTest +) +vespa_add_test(NAME searchcorespi_disk_indexes_test_app COMMAND searchcorespi_disk_indexes_test_app) diff --git a/searchcore/src/tests/index/disk_indexes/disk_indexes_test.cpp b/searchcore/src/tests/index/disk_indexes/disk_indexes_test.cpp new file mode 100644 index 00000000000..d22ad499316 --- /dev/null +++ b/searchcore/src/tests/index/disk_indexes/disk_indexes_test.cpp @@ -0,0 +1,196 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchcorespi/index/disk_indexes.h> +#include <vespa/searchcorespi/index/index_disk_dir.h> +#include <vespa/searchcorespi/index/indexdisklayout.h> +#include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <fstream> + +namespace { + +vespalib::string base_dir("base"); + +constexpr uint32_t block_size = 4_Ki; + +} + +namespace searchcorespi::index { + +class DiskIndexesTest : public ::testing::Test, + public DiskIndexes +{ + IndexDiskLayout _layout; +protected: + DiskIndexesTest(); + ~DiskIndexesTest(); + + static IndexDiskDir get_index_disk_dir(const vespalib::string& dir) { + return IndexDiskLayout::get_index_disk_dir(dir); + } + + void assert_transient_size(uint64_t exp, IndexDiskDir index_disk_dir) { + EXPECT_EQ(exp, get_transient_size(_layout, index_disk_dir)); + } +}; + +DiskIndexesTest::DiskIndexesTest() + : ::testing::Test(), + DiskIndexes(), + _layout(base_dir) +{ +} + +DiskIndexesTest::~DiskIndexesTest() = default; + +TEST_F(DiskIndexesTest, simple_set_active_works) +{ + EXPECT_FALSE(isActive("index.flush.1")); + setActive("index.flush.1", 0); + EXPECT_TRUE(isActive("index.flush.1")); + notActive("index.flush.1"); + EXPECT_FALSE(isActive("index.flush.1")); +} + +TEST_F(DiskIndexesTest, nested_set_active_works) +{ + setActive("index.flush.1", 0); + setActive("index.flush.1", 0); + EXPECT_TRUE(isActive("index.flush.1")); + notActive("index.flush.1"); + EXPECT_TRUE(isActive("index.flush.1")); + notActive("index.flush.1"); + EXPECT_FALSE(isActive("index.flush.1")); +} + +TEST_F(DiskIndexesTest, is_active_returns_false_for_bad_name) +{ + EXPECT_FALSE(isActive("foo/bar/baz")); + EXPECT_FALSE(isActive("index.flush.0")); +} + +TEST_F(DiskIndexesTest, remove_works) +{ + EXPECT_TRUE(remove(IndexDiskDir())); + auto fusion1 = get_index_disk_dir("index.fusion.1"); + EXPECT_TRUE(remove(fusion1)); + add_not_active(fusion1); + EXPECT_TRUE(remove(fusion1)); + setActive("index.fusion.1", 0); + EXPECT_FALSE(remove(fusion1)); + notActive("index.fusion.1"); + EXPECT_TRUE(remove(fusion1)); +} + +TEST_F(DiskIndexesTest, basic_get_transient_size_works) +{ + /* + * When starting to use a new fusion index, we have a transient + * period with two ISearchableIndexCollection instances: + * - old, containing index.fusion.1 and index.flush.2 + * - new, containing index.fusion.2 + */ + setActive("index.fusion.1", 1000000); + setActive("index.flush.2", 500000); + setActive("index.fusion.2", 1200000); + auto fusion1 = get_index_disk_dir("index.fusion.1"); + auto flush2 = get_index_disk_dir("index.flush.2"); + auto fusion2 = get_index_disk_dir("index.fusion.2"); + { + /* + * When using the old index collection, disk space used by + * index.fusion.2 is considered transient. + */ + SCOPED_TRACE("index.fusion.1"); + assert_transient_size(1200000, fusion1); + } + { + SCOPED_TRACE("index.flush.2"); + assert_transient_size(0, flush2); + } + { + /* + * When using the new index collection, disk space used by + * index.fusion.1 and index.flush.2 is considered transient. + */ + SCOPED_TRACE("index.fusion.2"); + assert_transient_size(1500000, fusion2); + } + notActive("index.fusion.1"); + notActive("index.flush.2"); + { + /* + * old index collection removed. + */ + SCOPED_TRACE("index.fusion.2 after remove of index.fusion.1 and index.flush.1"); + assert_transient_size(0, fusion2); + } +} + +TEST_F(DiskIndexesTest, get_transient_size_during_ongoing_fusion) +{ + /* + * During ongoing fusion, we have one ISearchableIndexCollection instance: + * - old, containing index.fusion.1 and index.flush.2 + * + * Fusion output directory is index.fusion.2 + */ + setActive("index.fusion.1", 1000000); + setActive("index.flush.2", 500000); + auto fusion1 = get_index_disk_dir("index.fusion.1"); + auto fusion2 = get_index_disk_dir("index.fusion.2"); + add_not_active(fusion2); // start tracking disk space for fusion output + { + /* + * Fusion not yet started. + */ + SCOPED_TRACE("dir missing"); + assert_transient_size(0, fusion1); + } + auto dir = base_dir + "/index.fusion.2"; + vespalib::mkdir(dir, true); + { + /* + * Fusion started, but no files written yet. + */ + SCOPED_TRACE("empty dir"); + assert_transient_size(0, fusion1); + } + constexpr uint32_t seek_pos = 999999; + { + std::string name = dir + "/foo"; + std::ofstream ostr(name, std::ios::binary); + ostr.seekp(seek_pos); + ostr.write(" ", 1); + ostr.flush(); + ostr.close(); + } + { + /* + * Fusion started, one file written. + */ + SCOPED_TRACE("single file"); + assert_transient_size((seek_pos + block_size) / block_size * block_size, fusion1); + } + EXPECT_TRUE(remove(fusion2)); // stop tracking disk space for fusion output + { + /* + * Fusion aborted. + */ + SCOPED_TRACE("removed"); + assert_transient_size(0, fusion1); + } +} + +} + +int +main(int argc, char* argv[]) +{ + vespalib::rmdir(base_dir, true); + ::testing::InitGoogleTest(&argc, argv); + auto result = RUN_ALL_TESTS(); + vespalib::rmdir(base_dir, true); + return result; +} diff --git a/searchcore/src/tests/index/index_disk_layout/CMakeLists.txt b/searchcore/src/tests/index/index_disk_layout/CMakeLists.txt new file mode 100644 index 00000000000..4e82cf1b9d2 --- /dev/null +++ b/searchcore/src/tests/index/index_disk_layout/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcorespi_index_disk_layout_test_app + SOURCES + index_disk_layout_test.cpp + DEPENDS + searchcorespi + GTest::GTest +) +vespa_add_test(NAME searchcorespi_index_disk_layout_test_app COMMAND searchcorespi_index_disk_layout_test_app) diff --git a/searchcore/src/tests/index/index_disk_layout/index_disk_layout_test.cpp b/searchcore/src/tests/index/index_disk_layout/index_disk_layout_test.cpp new file mode 100644 index 00000000000..e35225b2745 --- /dev/null +++ b/searchcore/src/tests/index/index_disk_layout/index_disk_layout_test.cpp @@ -0,0 +1,60 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchcorespi/index/indexdisklayout.h> +#include <vespa/searchcorespi/index/index_disk_dir.h> +#include <vespa/vespalib/gtest/gtest.h> + +namespace searchcorespi::index { + +namespace { + +void expect_index_disk_dir(IndexDiskDir exp, const vespalib::string& dir) +{ + auto act = IndexDiskLayout::get_index_disk_dir(dir); + ASSERT_TRUE(act.valid()); + ASSERT_EQ(exp, act); +} + +void expect_bad_index_disk_dir(const vespalib::string& dir) +{ + auto act = IndexDiskLayout::get_index_disk_dir(dir); + ASSERT_FALSE(act.valid()); +} + +} + +TEST(IndexDiskLayoutTest, get_index_disk_dir_works) +{ + { + SCOPED_TRACE("index.fusion.1"); + expect_index_disk_dir(IndexDiskDir(1, true), "index.fusion.1"); + } + { + SCOPED_TRACE("index.flush.2"); + expect_index_disk_dir(IndexDiskDir(2, false), "index.flush.2"); + } + { + SCOPED_TRACE("index.flush.3"); + expect_index_disk_dir(IndexDiskDir(3, false), "index.flush.3"); + } + { + SCOPED_TRACE("foo/bar/index.flush.4"); + expect_index_disk_dir(IndexDiskDir(4, false), "foo/bar/index.flush.4"); + } + { + SCOPED_TRACE("index.flush."); + expect_bad_index_disk_dir("index.flush."); + } + { + SCOPED_TRACE("index.flush.0"); + expect_bad_index_disk_dir("index.flush.0"); + } + { + SCOPED_TRACE("asdf"); + expect_bad_index_disk_dir("asdf"); + } +} + +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/flushengine/CMakeLists.txt b/searchcore/src/tests/proton/flushengine/CMakeLists.txt index b45c4fa411c..a47b3f2c93f 100644 --- a/searchcore/src/tests/proton/flushengine/CMakeLists.txt +++ b/searchcore/src/tests/proton/flushengine/CMakeLists.txt @@ -5,6 +5,7 @@ vespa_add_executable(searchcore_flushengine_test_app TEST DEPENDS searchcore_flushengine searchcore_pcommon + searchcore_test ) vespa_add_test( NAME searchcore_flushengine_test_app diff --git a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt index 9f0c777a4d7..608f80e38aa 100644 --- a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt +++ b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/CMakeLists.txt @@ -3,8 +3,8 @@ vespa_add_executable(searchcore_flushengine_prepare_restart_flush_strategy_test_ SOURCES prepare_restart_flush_strategy_test.cpp DEPENDS - searchcorespi searchcore_flushengine + searchcore_test ) vespa_add_test( NAME searchcore_flushengine_prepare_restart_flush_strategy_test_app diff --git a/searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt b/searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt index 4a5dd2acb9d..aad11581e2f 100644 --- a/searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt +++ b/searchcore/src/tests/proton/metrics/documentdb_job_trackers/CMakeLists.txt @@ -4,5 +4,6 @@ vespa_add_executable(searchcore_documentdb_job_trackers_test_app TEST documentdb_job_trackers_test.cpp DEPENDS searchcore_proton_metrics + searchcore_test ) vespa_add_test(NAME searchcore_documentdb_job_trackers_test_app COMMAND searchcore_documentdb_job_trackers_test_app) diff --git a/searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt b/searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt index 6ab08602f67..4467bea4ab1 100644 --- a/searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt +++ b/searchcore/src/tests/proton/metrics/job_tracked_flush/CMakeLists.txt @@ -4,5 +4,6 @@ vespa_add_executable(searchcore_job_tracked_flush_test_app TEST job_tracked_flush_test.cpp DEPENDS searchcore_proton_metrics + searchcore_test ) vespa_add_test(NAME searchcore_job_tracked_flush_test_app COMMAND searchcore_job_tracked_flush_test_app) diff --git a/searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt b/searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt index 80c5521c763..207883aab0e 100644 --- a/searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt +++ b/searchcore/src/tests/proton/server/memoryflush/CMakeLists.txt @@ -5,5 +5,6 @@ vespa_add_executable(searchcore_memoryflush_test_app TEST DEPENDS searchcore_server searchcore_flushengine + searchcore_test ) vespa_add_test(NAME searchcore_memoryflush_test_app COMMAND searchcore_memoryflush_test_app) diff --git a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt index 60d3a05502a..501c5e468dd 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/bmcluster/CMakeLists.txt @@ -49,6 +49,7 @@ vespa_add_library(searchcore_bmcluster STATIC searchcore_grouping searchcore_proton_metrics searchcore_fconfig + searchcorespi storageserver_storageapp messagebus_messagebus-test messagebus diff --git a/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt index 5f288a116b4..ff740f7a4ae 100644 --- a/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/documentmetastore/CMakeLists.txt @@ -20,4 +20,5 @@ vespa_add_library(searchcore_documentmetastore STATIC searchcore_attribute searchcore_bucketdb searchcore_initializer + searchcorespi ) diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt index b506e889a97..a0e69af5b1e 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt @@ -16,4 +16,5 @@ vespa_add_library(searchcore_flushengine STATIC tls_stats_factory.cpp tls_stats_map.cpp DEPENDS + searchcorespi ) diff --git a/searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt index 7f5ece11366..ee9f0caded1 100644 --- a/searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/index/CMakeLists.txt @@ -7,4 +7,5 @@ vespa_add_library(searchcore_index STATIC indexmanager.cpp memoryindexwrapper.cpp DEPENDS + searchcorespi ) diff --git a/searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt index 41e2fe2105f..1c203dd1284 100644 --- a/searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/matching/CMakeLists.txt @@ -43,4 +43,5 @@ vespa_add_library(searchcore_matching STATIC viewresolver.cpp DEPENDS searchcore_grouping + searchcorespi ) diff --git a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt index 3a2e443bfef..5df9060ea07 100644 --- a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt @@ -8,6 +8,7 @@ vespa_add_library(searchcore_test STATIC clusterstatehandler.cpp documentdb_config_builder.cpp dummy_feed_view.cpp + dummy_flush_target.cpp mock_index_manager.cpp mock_shared_threading_service.cpp userdocumentsbuilder.cpp @@ -17,4 +18,5 @@ vespa_add_library(searchcore_test STATIC DEPENDS searchcore_server searchcore_fconfig + searchcorespi ) diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp new file mode 100644 index 00000000000..8915e3b367c --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.cpp @@ -0,0 +1,15 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "dummy_flush_target.h" + +namespace proton::test { + +DummyFlushTarget::DummyFlushTarget(const vespalib::string &name) noexcept + : searchcorespi::IFlushTarget(name) +{} +DummyFlushTarget::DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept + : searchcorespi::IFlushTarget(name, type, component) +{} +DummyFlushTarget::~DummyFlushTarget() = default; + +} diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h index 3689b181c52..a9206233c9d 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_flush_target.h @@ -7,14 +7,9 @@ namespace proton::test { struct DummyFlushTarget : public searchcorespi::IFlushTarget { - DummyFlushTarget(const vespalib::string &name) noexcept - : searchcorespi::IFlushTarget(name) - {} - DummyFlushTarget(const vespalib::string &name, - const Type &type, - const Component &component) noexcept - : searchcorespi::IFlushTarget(name, type, component) - {} + DummyFlushTarget(const vespalib::string &name) noexcept; + DummyFlushTarget(const vespalib::string &name, const Type &type, const Component &component) noexcept; + ~DummyFlushTarget() override; MemoryGain getApproxMemoryGain() const override { return MemoryGain(0, 0); } DiskGain getApproxDiskGain() const override { return DiskGain(0, 0); } SerialNum getFlushedSerialNum() const override { return 0; } diff --git a/searchcore/src/vespa/searchcorespi/.gitignore b/searchcore/src/vespa/searchcorespi/.gitignore new file mode 100644 index 00000000000..9d6ecd51398 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/.gitignore @@ -0,0 +1,3 @@ +/.depend +/Makefile +/libsearchcorespi.so.5.1 diff --git a/searchcore/src/vespa/searchcorespi/CMakeLists.txt b/searchcore/src/vespa/searchcorespi/CMakeLists.txt new file mode 100644 index 00000000000..fab1d007a4f --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(searchcorespi STATIC + SOURCES + $<TARGET_OBJECTS:searchcorespi_flush> + $<TARGET_OBJECTS:searchcorespi_index> + DEPENDS +) diff --git a/searchcore/src/vespa/searchcorespi/flush/.gitignore b/searchcore/src/vespa/searchcorespi/flush/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/flush/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/searchcore/src/vespa/searchcorespi/flush/CMakeLists.txt b/searchcore/src/vespa/searchcorespi/flush/CMakeLists.txt new file mode 100644 index 00000000000..b2777d4327a --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/flush/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(searchcorespi_flush STATIC OBJECT + SOURCES + flushstats.cpp + DEPENDS +) diff --git a/searchcore/src/vespa/searchcorespi/flush/flushstats.cpp b/searchcore/src/vespa/searchcorespi/flush/flushstats.cpp new file mode 100644 index 00000000000..28632219a28 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/flush/flushstats.cpp @@ -0,0 +1,13 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "flushstats.h" + +namespace searchcorespi { + +FlushStats::FlushStats() : + _path(), + _pathElementsToLog(6) +{ +} + +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/flush/flushstats.h b/searchcore/src/vespa/searchcorespi/flush/flushstats.h new file mode 100644 index 00000000000..f92187b2112 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/flush/flushstats.h @@ -0,0 +1,28 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { + +/** + * Class with stats for what have been flushed. + */ +class FlushStats +{ +private: + vespalib::string _path; // path to data flushed + size_t _pathElementsToLog; + +public: + FlushStats(); + + void setPath(const vespalib::string & path) { _path = path; } + void setPathElementsToLog(size_t numElems) { _pathElementsToLog = numElems; } + + const vespalib::string & getPath() const { return _path; } + size_t getPathElementsToLog() const { return _pathElementsToLog; } +}; + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/flush/flushtask.h b/searchcore/src/vespa/searchcorespi/flush/flushtask.h new file mode 100644 index 00000000000..a4de3f65fff --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/flush/flushtask.h @@ -0,0 +1,18 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/executor.h> +#include <vespa/searchlib/common/serialnum.h> + +namespace searchcorespi { + +class FlushTask : public vespalib::Executor::Task +{ +public: + typedef std::unique_ptr<FlushTask> UP; + + virtual search::SerialNum getFlushSerial() const = 0; +}; + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h new file mode 100644 index 00000000000..dff6041d7d5 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/flush/iflushtarget.h @@ -0,0 +1,190 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "flushstats.h" +#include "flushtask.h" +#include <vespa/vespalib/util/time.h> +#include <vector> + +namespace search { class IFlushToken; } + +namespace searchcorespi { + +/** + * This abstract class represents a flushable object that uses + * getApproxBytesBeforeFlush() bytes of memory, that will be reduced to + * getApproxBytesAfterFlush() if flushed. + */ +class IFlushTarget +{ +public: + /** + * The flush types that a flush target can represent. + */ + enum class Type { + FLUSH, + SYNC, + GC, + OTHER + }; + + /** + * The component types that a flush target can be used for. + */ + enum class Component { + ATTRIBUTE, + INDEX, + DOCUMENT_STORE, + OTHER + }; + +private: + vespalib::string _name; + Type _type; + Component _component; + +public: + template<typename T> + class Gain { + public: + Gain() noexcept : _before(0), _after(0) { } + Gain(T before, T after) noexcept : _before(before), _after(after) { } + T getBefore() const { return _before; } + T getAfter() const { return _after; } + T gain() const { return _before - _after; } + double gainRate() const { return (_before != 0) ? double(gain())/_before : 0;} + Gain & operator += (const Gain & b) { _before += b.getBefore(); _after += b.getAfter(); return *this; } + static Gain noGain(size_t currentSize) { return Gain(currentSize, currentSize); } + private: + T _before; + T _after; + }; + using MemoryGain = Gain<int64_t>; + using DiskGain = Gain<int64_t>; + using SerialNum = search::SerialNum; + using Time = vespalib::system_time; + + /** + * Convenience typedefs. + */ + typedef std::shared_ptr<IFlushTarget> SP; + typedef std::vector<SP> List; + typedef FlushTask Task; + + /** + * Constructs a new instance of this class. + * + * @param name The handler-wide unique name of this target. + */ + IFlushTarget(const vespalib::string &name) noexcept + : _name(name), + _type(Type::OTHER), + _component(Component::OTHER) + { } + + /** + * Constructs a new instance of this class. + * + * @param name The handler-wide unique name of this target. + * @param type The flush type of this target. + * @param component The component type of this target. + */ + IFlushTarget(const vespalib::string &name, + const Type &type, + const Component &component) noexcept + : _name(name), + _type(type), + _component(component) + { } + + /** + * Virtual destructor required for inheritance. + */ + virtual ~IFlushTarget() = default; + + /** + * Returns the handler-wide unique name of this target. + * + * @return The name of this. + */ + const vespalib::string & getName() const { return _name; } + + /** + * Returns the flush type of this target. + */ + Type getType() const { return _type; } + + /** + * Returns the component type of this target. + */ + Component getComponent() const { return _component; } + + /** + * Returns the approximate memory gain of this target, in bytes. + * + * @return The gain + */ + virtual MemoryGain getApproxMemoryGain() const = 0; + + /** + * Returns the approximate memory gain of this target, in bytes. + * + * @return The gain + */ + virtual DiskGain getApproxDiskGain() const = 0; + + /** + * Returns the approximate amount of bytes this target writes to disk if flushed. + */ + virtual uint64_t getApproxBytesToWriteToDisk() const = 0; + + /** + * Return cost of replaying a feed operation relative to cost of reading a feed operation from tls. + */ + virtual double get_replay_operation_cost() const { return 0.0; } + + /** + * Returns the last serial number for the transaction applied to + * target before it was flushed to disk. The transaction log can + * not be pruned beyond this. + * + * @return The last serial number represented in flushed target + */ + virtual SerialNum getFlushedSerialNum() const = 0; + + /** + * Returns the time of last flush. + * + * @return The last flush time. + */ + virtual Time getLastFlushTime() const = 0; + + /** + * Return if the target itself is in bad need for a flush. + * + * @return true if an urgent flush is needed + */ + virtual bool needUrgentFlush() const { return false; } + + /** + * Initiates the flushing of temporary memory. This method must perform + * everything required to allow another thread to complete the flush. This + * method is called by the flush scheduler thread. + * + * @param currentSerial The current transaction serial number. + * @return The task used to complete the flush. + */ + virtual Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) = 0; + + /** + * Returns the stats for the last completed flush operation + * for this flush target. + * + * @return The stats for the last flush. + */ + virtual FlushStats getLastFlushStats() const = 0; + +}; + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/flush/lambdaflushtask.h b/searchcore/src/vespa/searchcorespi/flush/lambdaflushtask.h new file mode 100644 index 00000000000..75737ce73d5 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/flush/lambdaflushtask.h @@ -0,0 +1,31 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "flushtask.h" + +namespace searchcorespi { + +template <class FunctionType> +class LambdaFlushTask : public FlushTask { + FunctionType _func; + search::SerialNum _flushSerial; + +public: + LambdaFlushTask(FunctionType &&func, search::SerialNum flushSerial) + : _func(std::move(func)), + _flushSerial(flushSerial) + {} + ~LambdaFlushTask() override = default; + search::SerialNum getFlushSerial() const override { return _flushSerial; } + void run() override { _func(); } +}; + +template <class FunctionType> +std::unique_ptr<FlushTask> +makeLambdaFlushTask(FunctionType &&function, search::SerialNum flushSerial) +{ + return std::make_unique<LambdaFlushTask<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function), flushSerial); +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/.gitignore b/searchcore/src/vespa/searchcorespi/index/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/searchcore/src/vespa/searchcorespi/index/CMakeLists.txt b/searchcore/src/vespa/searchcorespi/index/CMakeLists.txt new file mode 100644 index 00000000000..ca33131d7f4 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/CMakeLists.txt @@ -0,0 +1,28 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(searchcorespi_index STATIC OBJECT + SOURCES + diskindexcleaner.cpp + disk_indexes.cpp + disk_index_stats.cpp + eventlogger.cpp + fusionrunner.cpp + iindexmanager.cpp + iindexcollection.cpp + index_disk_dir_state.cpp + index_manager_explorer.cpp + index_manager_stats.cpp + indexcollection.cpp + indexdisklayout.cpp + indexflushtarget.cpp + indexfusiontarget.cpp + indexmaintainer.cpp + indexmaintainerconfig.cpp + indexmaintainercontext.cpp + indexmanagerconfig.cpp + indexreadutilities.cpp + index_searchable_stats.cpp + indexwriteutilities.cpp + warmupindexcollection.cpp + isearchableindexcollection.cpp + DEPENDS +) diff --git a/searchcore/src/vespa/searchcorespi/index/disk_index_stats.cpp b/searchcore/src/vespa/searchcorespi/index/disk_index_stats.cpp new file mode 100644 index 00000000000..1b77061de8c --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/disk_index_stats.cpp @@ -0,0 +1,25 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "disk_index_stats.h" +#include "idiskindex.h" + + +namespace searchcorespi::index { + +DiskIndexStats::DiskIndexStats() + : IndexSearchableStats(), + _indexDir() +{ +} + +DiskIndexStats::DiskIndexStats(const IDiskIndex &index) + : IndexSearchableStats(index), + _indexDir(index.getIndexDir()) +{ +} + +DiskIndexStats::~DiskIndexStats() +{ +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/disk_index_stats.h b/searchcore/src/vespa/searchcorespi/index/disk_index_stats.h new file mode 100644 index 00000000000..831d95e95c1 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/disk_index_stats.h @@ -0,0 +1,26 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "index_searchable_stats.h" +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { + +struct IDiskIndex; + +/** + * Information about a disk index usable by state explorer. + */ +class DiskIndexStats : public IndexSearchableStats { + vespalib::string _indexDir; +public: + DiskIndexStats(); + DiskIndexStats(const IDiskIndex &index); + ~DiskIndexStats(); + + const vespalib::string &getIndexdir() const { return _indexDir; } +}; + +} // namespace searchcorespi::index +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/disk_indexes.cpp b/searchcore/src/vespa/searchcorespi/index/disk_indexes.cpp new file mode 100644 index 00000000000..28f6a886d06 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/disk_indexes.cpp @@ -0,0 +1,131 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "disk_indexes.h" +#include "indexdisklayout.h" +#include "index_disk_dir.h" +#include "index_disk_dir_state.h" +#include <vespa/searchlib/util/dirtraverse.h> +#include <cassert> +#include <vector> + +using vespalib::string; + +namespace searchcorespi::index { + +DiskIndexes::DiskIndexes() = default; +DiskIndexes::~DiskIndexes() = default; + +void +DiskIndexes::setActive(const string &index, uint64_t size_on_disk) +{ + auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(index); + assert(index_disk_dir.valid()); + std::lock_guard lock(_lock); + auto insres = _active.insert(std::make_pair(index_disk_dir, IndexDiskDirState())); + insres.first->second.activate(); + if (!insres.first->second.get_size_on_disk().has_value()) { + insres.first->second.set_size_on_disk(size_on_disk); + } +} + +void DiskIndexes::notActive(const string & index) { + auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(index); + assert(index_disk_dir.valid()); + std::lock_guard lock(_lock); + auto it = _active.find(index_disk_dir); + assert(it != _active.end()); + assert(it->second.is_active()); + it->second.deactivate(); + if (!it->second.is_active()) { + _active.erase(it); + } +} + +bool DiskIndexes::isActive(const string &index) const { + auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(index); + if (!index_disk_dir.valid()) { + return false; + } + std::lock_guard lock(_lock); + auto it = _active.find(index_disk_dir); + return (it != _active.end()) && it->second.is_active(); +} + + +void +DiskIndexes::add_not_active(IndexDiskDir index_disk_dir) +{ + std::lock_guard lock(_lock); + _active.insert(std::make_pair(index_disk_dir, IndexDiskDirState())); +} + +bool +DiskIndexes::remove(IndexDiskDir index_disk_dir) +{ + if (!index_disk_dir.valid()) { + return true; + } + std::lock_guard lock(_lock); + auto it = _active.find(index_disk_dir); + if (it == _active.end()) { + return true; + } + if (it->second.is_active()) { + return false; + } + _active.erase(it); + return true; +} + +uint64_t +DiskIndexes::get_transient_size(IndexDiskLayout& layout, IndexDiskDir index_disk_dir) const +{ + /* + * Only report transient size related to a valid fusion index. This ensures + * that transient size is reported once per index collection. + */ + if (!index_disk_dir.valid() || !index_disk_dir.is_fusion_index()) { + return 0u; + } + uint64_t transient_size = 0u; + std::vector<IndexDiskDir> deferred; + { + std::lock_guard lock(_lock); + for (auto &entry : _active) { + if (entry.first < index_disk_dir) { + /* + * Indexes before current fusion index are on the way out and + * will be removed when all older index collections + * referencing them are destroyed. Disk space used by these + * indexes is considered transient. + */ + if (entry.second.get_size_on_disk().has_value()) { + transient_size += entry.second.get_size_on_disk().value(); + } + } + if (index_disk_dir < entry.first && entry.first.is_fusion_index()) { + /* + * Fusion indexes after current fusion index can be partially + * complete and might be removed if fusion is aborted. Disk + * space used by these indexes is consider transient. + */ + if (entry.second.get_size_on_disk().has_value()) { + transient_size += entry.second.get_size_on_disk().value(); + } else { + deferred.emplace_back(entry.first); + } + } + } + } + for (auto& entry : deferred) { + auto index_dir = layout.getFusionDir(entry.get_id()); + try { + search::DirectoryTraverse dirt(index_dir.c_str()); + transient_size += dirt.GetTreeSize(); + } catch (std::exception &) { + } + } + return transient_size; +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/disk_indexes.h b/searchcore/src/vespa/searchcorespi/index/disk_indexes.h new file mode 100644 index 00000000000..842c1814faf --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/disk_indexes.h @@ -0,0 +1,46 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <map> +#include <mutex> +#include <memory> + +namespace searchcorespi::index { + +class IndexDiskDir; +class IndexDiskDirState; +class IndexDiskLayout; + +/** + * Class used to keep track of the set of disk indexes in an index maintainer. + * The index directories are used as identifiers. + * + * DiskIndexCleaner will remove old disk indexes not marked active, + * i.e. old disk indexes used by old index collections are not removed. + * + * At start of fusion, an entry for fusion output index is added, to allow for + * tracking of transient disk use while fusion is ongoing. If fusion fails then + * the entry is removed, otherwise the entry is marked active as a side effect + * of setting up a new index collection. + */ +class DiskIndexes { + std::map<IndexDiskDir, IndexDiskDirState> _active; + mutable std::mutex _lock; + +public: + using SP = std::shared_ptr<DiskIndexes>; + DiskIndexes(); + ~DiskIndexes(); + DiskIndexes(const DiskIndexes &) = delete; + DiskIndexes & operator = (const DiskIndexes &) = delete; + void setActive(const vespalib::string & index, uint64_t size_on_disk); + void notActive(const vespalib::string & index); + bool isActive(const vespalib::string & index) const; + void add_not_active(IndexDiskDir index_disk_dir); + bool remove(IndexDiskDir index_disk_dir); + uint64_t get_transient_size(IndexDiskLayout& layout, IndexDiskDir index_disk_dir) const; +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.cpp b/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.cpp new file mode 100644 index 00000000000..3bed7ea8ea7 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.cpp @@ -0,0 +1,125 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "diskindexcleaner.h" +#include "disk_indexes.h" +#include "indexdisklayout.h" +#include "index_disk_dir.h" +#include <vespa/fastos/file.h> +#include <vespa/vespalib/io/fileutil.h> +#include <sstream> +#include <vector> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.diskindexcleaner"); + +using std::istringstream; +using vespalib::string; +using std::vector; + +namespace searchcorespi::index { + +namespace { +vector<string> readIndexes(const string &base_dir) { + vector<string> indexes; + FastOS_DirectoryScan dir_scan(base_dir.c_str()); + while (dir_scan.ReadNext()) { + string name = dir_scan.GetName(); + if (!dir_scan.IsDirectory() || name.find("index.") != 0) { + continue; + } + indexes.push_back(name); + } + return indexes; +} + +bool isValidIndex(const string &index_dir) { + FastOS_File serial_file((index_dir + "/serial.dat").c_str()); + return serial_file.OpenReadOnlyExisting(); +} + +void invalidateIndex(const string &index_dir) { + vespalib::unlink(index_dir + "/serial.dat"); + vespalib::File::sync(index_dir); +} + +uint32_t findLastFusionId(const string &base_dir, + const vector<string> &indexes) { + uint32_t fusion_id = 0; + const string prefix = "index.fusion."; + for (size_t i = 0; i < indexes.size(); ++i) { + if (indexes[i].find(prefix) != 0) { + continue; + } + if (!isValidIndex(base_dir + "/" + indexes[i])) { + continue; + } + + uint32_t new_id = 0; + istringstream ist(indexes[i].substr(prefix.size())); + ist >> new_id; + fusion_id = std::max(fusion_id, new_id); + } + return fusion_id; +} + +void removeDir(const string &dir) { + LOG(debug, "Removing index dir '%s'", dir.c_str()); + invalidateIndex(dir); + vespalib::rmdir(dir, true); +} + +bool isOldIndex(const string &index, uint32_t last_fusion_id) { + string::size_type pos = index.rfind("."); + istringstream ist(index.substr(pos + 1)); + uint32_t id = last_fusion_id; + ist >> id; + if (id < last_fusion_id) { + return true; + } else if (id == last_fusion_id) { + return index.find("flush") != string::npos; + } + return false; +} + +void removeOld(const string &base_dir, const vector<string> &indexes, + DiskIndexes &disk_indexes, bool remove) { + uint32_t last_fusion_id = findLastFusionId(base_dir, indexes); + for (size_t i = 0; i < indexes.size(); ++i) { + const string index_dir = base_dir + "/" + indexes[i]; + auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(indexes[i]); + if (isOldIndex(indexes[i], last_fusion_id) && + disk_indexes.remove(index_disk_dir)) { + if (remove) { + removeDir(index_dir); + } else { + invalidateIndex(index_dir); + } + } + } +} + +void removeInvalid(const string &base_dir, const vector<string> &indexes) { + for (size_t i = 0; i < indexes.size(); ++i) { + const string index_dir = base_dir + "/" + indexes[i]; + if (!isValidIndex(index_dir)) { + LOG(debug, "Found invalid index dir '%s'", index_dir.c_str()); + removeDir(index_dir); + } + } +} +} // namespace + +void DiskIndexCleaner::clean(const string &base_dir, + DiskIndexes &disk_indexes) { + vector<string> indexes = readIndexes(base_dir); + removeOld(base_dir, indexes, disk_indexes, false); + removeInvalid(base_dir, indexes); +} + +void DiskIndexCleaner::removeOldIndexes( + const string &base_dir, DiskIndexes &disk_indexes) { + vector<string> indexes = readIndexes(base_dir); + removeOld(base_dir, indexes, disk_indexes, true); +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.h b/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.h new file mode 100644 index 00000000000..cbd3a5aa94f --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/diskindexcleaner.h @@ -0,0 +1,26 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { +class DiskIndexes; + +/** + * Utility class used to clean and remove index directories. + */ +struct DiskIndexCleaner { + /** + * Deletes all indexes with id lower than the most recent fusion id. + */ + static void clean(const vespalib::string &index_dir, + DiskIndexes& disk_indexes); + static void removeOldIndexes(const vespalib::string &index_dir, + DiskIndexes& disk_indexes); +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/eventlogger.cpp b/searchcore/src/vespa/searchcorespi/index/eventlogger.cpp new file mode 100644 index 00000000000..7a5b1bf907a --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/eventlogger.cpp @@ -0,0 +1,69 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "eventlogger.h" +#include <vespa/searchlib/util/logutil.h> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.eventlogger"); + +using vespalib::JSONStringer; +using search::util::LogUtil; + +namespace searchcorespi::index { + +void +EventLogger::diskIndexLoadStart(const vespalib::string &indexDir) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("input"); + LogUtil::logDir(jstr, indexDir, 6); + jstr.endObject(); + EV_STATE("diskindex.load.start", jstr.toString().data()); +} + +void +EventLogger::diskIndexLoadComplete(const vespalib::string &indexDir, + int64_t elapsedTimeMs) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs); + jstr.appendKey("input"); + LogUtil::logDir(jstr, indexDir, 6); + jstr.endObject(); + EV_STATE("diskindex.load.complete", jstr.toString().data()); +} + +void +EventLogger::diskFusionStart(const std::vector<vespalib::string> &sources, + const vespalib::string &fusionDir) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("inputs"); + jstr.beginArray(); + for (size_t i = 0; i < sources.size(); ++i) { + LogUtil::logDir(jstr, sources[i], 6); + } + jstr.endArray(); + jstr.appendKey("output"); + LogUtil::logDir(jstr, fusionDir, 6); + jstr.endObject(); + EV_STATE("fusion.start", jstr.toString().data()); +} + +void +EventLogger::diskFusionComplete(const vespalib::string &fusionDir, + int64_t elapsedTimeMs) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs); + jstr.appendKey("output"); + LogUtil::logDir(jstr, fusionDir, 6); + jstr.endObject(); + EV_STATE("fusion.complete", jstr.toString().data()); +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/eventlogger.h b/searchcore/src/vespa/searchcorespi/index/eventlogger.h new file mode 100644 index 00000000000..6191543dcb3 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/eventlogger.h @@ -0,0 +1,25 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <vector> + +namespace searchcorespi { +namespace index { + +/** + * Class used to log various events related to disk index handling. + **/ +struct EventLogger { + static void diskIndexLoadStart(const vespalib::string &indexDir); + static void diskIndexLoadComplete(const vespalib::string &indexDir, + int64_t elapsedTimeMs); + static void diskFusionStart(const std::vector<vespalib::string> &sources, + const vespalib::string &fusionDir); + static void diskFusionComplete(const vespalib::string &fusionDir, + int64_t elapsedTimeMs); +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/fakeindexsearchable.h b/searchcore/src/vespa/searchcorespi/index/fakeindexsearchable.h new file mode 100644 index 00000000000..25a8b6847e9 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/fakeindexsearchable.h @@ -0,0 +1,50 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "indexsearchable.h" +#include <vespa/searchlib/queryeval/fake_searchable.h> + +namespace searchcorespi { + +/** + * A fake index searchable used for unit testing. + */ +class FakeIndexSearchable : public IndexSearchable { +private: + search::queryeval::FakeSearchable _fake; + +public: + FakeIndexSearchable() : _fake() { } + + search::queryeval::FakeSearchable &getFake() { return _fake; } + + /** + * Implements IndexSearchable + */ + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term) override + { + return _fake.createBlueprint(requestContext, field, term); + } + + search::SearchableStats getSearchableStats() const override { + return search::SearchableStats(); + } + + search::SerialNum getSerialNum() const override { return 0; } + void accept(IndexSearchableVisitor &visitor) const override { + (void) visitor; + } + + search::index::FieldLengthInfo get_field_length_info(const vespalib::string& field_name) const override { + (void) field_name; + return search::index::FieldLengthInfo(); + } + +}; + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcore/src/vespa/searchcorespi/index/fusionrunner.cpp new file mode 100644 index 00000000000..1675b6091cf --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/fusionrunner.cpp @@ -0,0 +1,133 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "fusionrunner.h" +#include "eventlogger.h" +#include "fusionspec.h" +#include <vespa/searchlib/common/serialnumfileheadercontext.h> +#include <vespa/searchlib/attribute/fixedsourceselector.h> +#include <vespa/searchlib/queryeval/isourceselector.h> +#include <vespa/searchlib/util/dirtraverse.h> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.fusionrunner"); + +using search::FixedSourceSelector; +using search::TuneFileAttributes; +using search::TuneFileIndexing; +using search::common::FileHeaderContext; +using search::common::SerialNumFileHeaderContext; +using search::index::Schema; +using search::queryeval::ISourceSelector; +using search::diskindex::SelectorArray; +using search::SerialNum; +using std::vector; +using vespalib::string; + +namespace searchcorespi::index { + +FusionRunner::FusionRunner(const string &base_dir, + const Schema &schema, + const TuneFileAttributes &tuneFileAttributes, + const FileHeaderContext &fileHeaderContext) + : _diskLayout(base_dir), + _schema(schema), + _tuneFileAttributes(tuneFileAttributes), + _fileHeaderContext(fileHeaderContext) +{ } + +FusionRunner::~FusionRunner() = default; + +namespace { + +void readSelectorArray(const string &selector_name, SelectorArray &selector_array, + const vector<uint8_t> &id_map, uint32_t base_id, uint32_t fusion_id) { + FixedSourceSelector::UP selector = + FixedSourceSelector::load(selector_name, fusion_id); + if (base_id != selector->getBaseId()) { + selector = selector->cloneAndSubtract("tmp_for_fusion", base_id - selector->getBaseId()); + } + + const uint32_t num_docs = selector->getDocIdLimit(); + selector_array.reserve(num_docs); + auto it = selector->createIterator(); + for (uint32_t i = 0; i < num_docs; ++i) { + search::queryeval::Source source = it->getSource(i); + // Workaround for source selector corruption. + // Treat out of range source as last source. + if (source >= id_map.size()) { + source = id_map.size() - 1; + } + assert(source < id_map.size()); + selector_array.push_back(id_map[source]); + } +} + +bool +writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id, + uint32_t highest_doc_id, + const TuneFileAttributes &tuneFileAttributes, + const FileHeaderContext &fileHeaderContext) +{ + const search::queryeval::Source default_source = 0; + FixedSourceSelector fusion_selector(default_source, "fusion_selector"); + fusion_selector.setSource(highest_doc_id, default_source); + fusion_selector.setBaseId(fusion_id); + string selector_name = IndexDiskLayout::getSelectorFileName(diskLayout.getFusionDir(fusion_id)); + if (!fusion_selector.extractSaveInfo(selector_name)->save(tuneFileAttributes, fileHeaderContext)) { + LOG(warning, "Unable to write source selector data for fusion.%u.", fusion_id); + return false; + } + return true; +} +} // namespace + +uint32_t +FusionRunner::fuse(const FusionSpec &fusion_spec, + SerialNum lastSerialNum, + IIndexMaintainerOperations &operations, + std::shared_ptr<search::IFlushToken> flush_token) +{ + const vector<uint32_t> &ids = fusion_spec.flush_ids; + if (ids.empty()) { + return 0; + } + const uint32_t fusion_id = ids.back(); + const string fusion_dir = _diskLayout.getFusionDir(fusion_id); + + vector<string> sources; + vector<uint8_t> id_map(fusion_id + 1); + if (fusion_spec.last_fusion_id != 0) { + id_map[0] = sources.size(); + sources.push_back(_diskLayout.getFusionDir(fusion_spec.last_fusion_id)); + } + for (uint32_t id : ids) { + id_map[id - fusion_spec.last_fusion_id] = sources.size(); + sources.push_back(_diskLayout.getFlushDir(id)); + } + + if (LOG_WOULD_LOG(event)) { + EventLogger::diskFusionStart(sources, fusion_dir); + } + vespalib::Timer timer; + + const string selector_name = IndexDiskLayout::getSelectorFileName(_diskLayout.getFlushDir(fusion_id)); + SelectorArray selector_array; + readSelectorArray(selector_name, selector_array, id_map, fusion_spec.last_fusion_id, fusion_id); + + if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, lastSerialNum, flush_token)) { + return 0; + } + + const uint32_t highest_doc_id = selector_array.size() - 1; + SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, lastSerialNum); + if (!writeFusionSelector(_diskLayout, fusion_id, highest_doc_id, _tuneFileAttributes, fileHeaderContext)) { + return 0; + } + + if (LOG_WOULD_LOG(event)) { + EventLogger::diskFusionComplete(fusion_dir, vespalib::count_ms(timer.elapsed())); + } + return fusion_id; +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/fusionrunner.h b/searchcore/src/vespa/searchcorespi/index/fusionrunner.h new file mode 100644 index 00000000000..92ad42b76ad --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/fusionrunner.h @@ -0,0 +1,65 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "iindexmaintaineroperations.h" +#include "indexdisklayout.h" +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchlib/common/tunefileinfo.h> + +namespace search +{ + +namespace common +{ + +class FileHeaderContext; + +} + +} + +namespace searchcorespi { +namespace index { +struct FusionSpec; + +/** + * FusionRunner runs fusion on a set of disk indexes, specified as a + * vector of ids. The disk indexes must be stored in directories named + * "index.flush.<id>" within the base dir, and the fusioned indexes + * will be stored similarly in directories named "index.fusion.<id>". + **/ +class FusionRunner { + const IndexDiskLayout _diskLayout; + const search::index::Schema _schema; + const search::TuneFileAttributes _tuneFileAttributes; + const search::common::FileHeaderContext &_fileHeaderContext; + +public: + /** + * Create a FusionRunner that operates on indexes stored in the + * base dir. + **/ + FusionRunner(const vespalib::string &base_dir, + const search::index::Schema &schema, + const search::TuneFileAttributes &tuneFileAttributes, + const search::common::FileHeaderContext &fileHeaderContext); + ~FusionRunner(); + + /** + * Combine the indexes specified by the ids by running fusion. + * + * @param fusion_spec the specification on which indexes to run fusion on. + * @param lastSerialNum the serial number of the last flushed index part of the fusion spec. + * @param operations interface used for running the actual fusion. + * @return the id of the fusioned disk index + **/ + uint32_t fuse(const FusionSpec &fusion_spec, + search::SerialNum lastSerialNum, + IIndexMaintainerOperations &operations, + std::shared_ptr<search::IFlushToken> flush_token); +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/fusionspec.h b/searchcore/src/vespa/searchcorespi/index/fusionspec.h new file mode 100644 index 00000000000..0b147140e55 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/fusionspec.h @@ -0,0 +1,22 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstdint> +#include <vector> + +namespace searchcorespi::index { + +/** + * Specifies a set of disk index ids for fusion. + * + * Note: All ids in FusionSpec are absolute ids. + **/ +struct FusionSpec { + uint32_t last_fusion_id; + std::vector<uint32_t> flush_ids; + + FusionSpec() : last_fusion_id(0), flush_ids() {} +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/i_thread_service.h b/searchcore/src/vespa/searchcorespi/index/i_thread_service.h new file mode 100644 index 00000000000..f973908b62d --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/i_thread_service.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/runnable.h> +#include <vespa/vespalib/util/threadexecutor.h> + +namespace searchcorespi::index { + +/** + * Interface for a single thread used for write tasks. + */ +struct IThreadService : public vespalib::ThreadExecutor +{ + IThreadService(const IThreadService &) = delete; + IThreadService & operator = (const IThreadService &) = delete; + IThreadService() = default; + virtual ~IThreadService() {} + + /** + * Run the given runnable in the underlying thread and wait until its done. + */ + virtual void run(vespalib::Runnable &runnable) = 0; + + /** + * Returns whether the current thread is the underlying thread. + */ + virtual bool isCurrentThread() const = 0; +}; + +struct ISyncableThreadService : public IThreadService, vespalib::Syncable { + +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/idiskindex.h b/searchcore/src/vespa/searchcorespi/index/idiskindex.h new file mode 100644 index 00000000000..010e7e7727d --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/idiskindex.h @@ -0,0 +1,31 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchcorespi/index/indexsearchable.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi::index { + +/** + * Interface for a disk index as seen from an index maintainer. + */ +struct IDiskIndex : public IndexSearchable { + using SP = std::shared_ptr<IDiskIndex>; + virtual ~IDiskIndex() {} + + /** + * Returns the directory in which this disk index exists. + */ + virtual const vespalib::string &getIndexDir() const = 0; + + /** + * Returns the schema used by this disk index. + * Note that the schema should be part of the index on disk. + */ + virtual const search::index::Schema &getSchema() const = 0; +}; + +} + + diff --git a/searchcore/src/vespa/searchcorespi/index/iindexcollection.cpp b/searchcore/src/vespa/searchcorespi/index/iindexcollection.cpp new file mode 100644 index 00000000000..988c0084d4f --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/iindexcollection.cpp @@ -0,0 +1,46 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "iindexcollection.h" +#include "idiskindex.h" +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/searchlib/queryeval/isourceselector.h> + +namespace searchcorespi { + +using index::IDiskIndex; + +vespalib::string IIndexCollection::toString() const +{ + vespalib::asciistream s; + s << "selector : " << &getSourceSelector() << "(baseId=" << getSourceSelector().getBaseId() + << ", docidlimit=" << getSourceSelector().getDocIdLimit() + << ", defaultsource=" << uint32_t(getSourceSelector().getDefaultSource()) + << ")\n"; +#if 0 + search::queryeval::ISourceSelector::Iterator::UP it = getSourceSelector().createIterator(); + s << "{"; + for (size_t i(0), m(getSourceSelector().getDocIdLimit()); i < m; i++) { + s << uint32_t(it->getSource(i)) << ' '; + } + s << "}\n"; +#endif + s << getSourceCount() << " {"; + if (getSourceCount() > 0) { + for (size_t i(0), m(getSourceCount()); i < m; i++) { + if (i != 0) { + s << ", "; + } + const IndexSearchable & is(getSearchable(i)); + s << getSourceId(i) << " : " << &is << "("; + if (dynamic_cast<const IDiskIndex *>(&is) != NULL) { + s << dynamic_cast<const IDiskIndex &>(is).getIndexDir().c_str(); + } else { + s << typeid(is).name(); + } + s << ")"; + } + } + s << "}"; + return s.str(); +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/iindexcollection.h b/searchcore/src/vespa/searchcorespi/index/iindexcollection.h new file mode 100644 index 00000000000..1cbf994b44f --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/iindexcollection.h @@ -0,0 +1,49 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "indexsearchable.h" + +namespace search::queryeval { class ISourceSelector; } +namespace searchcorespi { + +/** + * Interface for a set of index searchables with source ids, + * and a source selector for determining which index searchable to use for each document. + */ +class IIndexCollection { +protected: + using ISourceSelector = search::queryeval::ISourceSelector; +public: + typedef std::unique_ptr<IIndexCollection> UP; + typedef std::shared_ptr<IIndexCollection> SP; + + virtual ~IIndexCollection() {} + + /** + * Returns the source selector used to determine which index to use for each document. + */ + virtual const ISourceSelector &getSourceSelector() const = 0; + + /** + * Returns the number sources (index searchables) for this collection. + */ + virtual size_t getSourceCount() const = 0; + + /** + * Returns the index searchable for source i (i in the range [0, getSourceCount()>). + */ + virtual IndexSearchable &getSearchable(uint32_t i) const = 0; + + /** + * Returns the source id for source i (i in the range [0, getSourceCount()>). + * The source id is used for this source in the source selector. + */ + virtual uint32_t getSourceId(uint32_t i) const = 0; + + virtual vespalib::string toString() const; + +}; + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcore/src/vespa/searchcorespi/index/iindexmaintaineroperations.h new file mode 100644 index 00000000000..9025b56dc27 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/iindexmaintaineroperations.h @@ -0,0 +1,63 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "idiskindex.h" +#include "imemoryindex.h" +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchlib/diskindex/docidmapper.h> +#include <vespa/searchlib/index/i_field_length_inspector.h> + +namespace search { class IFlushToken; } + +namespace searchcorespi::index { + +/** + * Interface for operations needed by an index maintainer. + */ +struct IIndexMaintainerOperations { + using IFieldLengthInspector = search::index::IFieldLengthInspector; + using Schema = search::index::Schema; + using SelectorArray = search::diskindex::SelectorArray; + virtual ~IIndexMaintainerOperations() {} + + /** + * Creates a new memory index using the given schema. + */ + virtual IMemoryIndex::SP createMemoryIndex(const Schema& schema, + const IFieldLengthInspector& inspector, + search::SerialNum serialNum) = 0; + + /** + * Loads a disk index from the given directory. + */ + virtual IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir) = 0; + + /** + * Reloads the given disk index and returns a new instance. + */ + virtual IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex) = 0; + + /** + * Runs fusion on a given set of input disk indexes to create a fusioned output disk index. + * The selector array contains a source for all local document ids ([0, docIdLimit>) + * in the range [0, sources.size()> and is used to determine in which input disk index + * a document is located. + * + * @param schema the schema of the resulting fusioned disk index. + * @param outputDir the output directory of the fusioned disk index. + * @param sources the directories of the input disk indexes. + * @param selectorArray the array specifying in which input disk index a document is located. + * @param lastSerialNum the serial number of the last operation in the last input disk index. + */ + virtual bool runFusion(const Schema &schema, + const vespalib::string &outputDir, + const std::vector<vespalib::string> &sources, + const SelectorArray &selectorArray, + search::SerialNum lastSerialNum, + std::shared_ptr<search::IFlushToken> flush_token) = 0; +}; + +} + + diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmanager.cpp b/searchcore/src/vespa/searchcorespi/index/iindexmanager.cpp new file mode 100644 index 00000000000..70770f6f012 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/iindexmanager.cpp @@ -0,0 +1,8 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "iindexmanager.h" + +namespace searchcorespi { + +IIndexManager::Reconfigurer::~Reconfigurer() = default; + +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/iindexmanager.h b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h new file mode 100644 index 00000000000..a4173b41aa5 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/iindexmanager.h @@ -0,0 +1,206 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "indexsearchable.h" +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchcorespi/flush/flushstats.h> +#include <vespa/searchcorespi/flush/iflushtarget.h> +#include <vespa/searchlib/common/serialnum.h> + +namespace vespalib { class IDestructorCallback; } +namespace document { class Document; } + +namespace searchcorespi { + +/** + * Interface for an index manager: + * - Keeps track of a set of indexes (i.e. both memory indexes and disk indexes). + * - Documents can be inserted, updated or removed in/from the active memory index. + * - Enables search across all the indexes. + * - Manages the set of indexes through flush targets to the flush engine (i.e. flushing of memory + * indexes and fusion of disk indexes). + * + * Key items in this interface is <em>lid</em> which is a local document id assigned to each document. + * This is a numeric id in the range [0...Maximum number of documents concurrently in this DB>. + * Local document id 0 is reserved. Another key item is the <em>serialnumber</em> which is the + * serial number an operation was given when first seen by the searchcore. This is a monotonic + * increasing number used for sequencing operations and figuring out how up to date componets are. + * Used during restart/replay and for deciding when to flush. Both the lid and the serial number + * are persisted alongside an operation to ensure correct playback during recovery. + */ +class IIndexManager { +protected: + using Document = document::Document; + using SerialNum = search::SerialNum; + using Schema = search::index::Schema; + using LidVector = std::vector<uint32_t>; +public: + using OnWriteDoneType = const std::shared_ptr<vespalib::IDestructorCallback> &; + + struct Configure { + virtual ~Configure() = default; + virtual bool configure() = 0; + }; + template <class FunctionType> + class LambdaConfigure : public Configure { + FunctionType _func; + + public: + LambdaConfigure(FunctionType &&func) + : _func(std::move(func)) + {} + ~LambdaConfigure() override = default; + bool configure() override { return _func(); } + }; + + template <class FunctionType> + static std::unique_ptr<Configure> + makeLambdaConfigure(FunctionType &&function) + { + return std::make_unique<LambdaConfigure<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); + } + + /** + * Interface used to signal when index manager has been reconfigured. + */ + struct Reconfigurer { + using Configure = searchcorespi::IIndexManager::Configure; + virtual ~Reconfigurer(); + /** + * Reconfigure index manager and infrastructure around it while system is in a quiescent state. + */ + virtual bool reconfigure(std::unique_ptr<Configure> configure) = 0; + }; + + typedef std::unique_ptr<IIndexManager> UP; + typedef std::shared_ptr<IIndexManager> SP; + + virtual ~IIndexManager() = default; + + /** + * Inserts a document into the index. This method is async, caller + * must either wait for notification about write done or sync + * indexFieldWriter executor in threading service to get sync + * behavior. + * + * If the inserted document id already exist the old version must + * be removed before inserting the new. + * + * @param lid The local document id for the document. + * + * @param doc The document to insert. + * + * @param serialNum The unique monotoninc increasing serial number + * for this operation. + * + * @param on_write_done shared object that notifies write done when + * destructed. + **/ + virtual void putDocument(uint32_t lid, const Document &doc, SerialNum serialNum, OnWriteDoneType on_write_done) = 0; + + /** + * Removes the given document from the index. This method is + * async, caller must either wait for notification about write + * done or sync indexFieldWriter executor in threading service to + * get sync behavior. + * + * @param lid The local document id for the document. + * + * @param serialNum The unique monotoninc increasing serial number + * for this operation. + **/ + void removeDocument(uint32_t lid, SerialNum serialNum) { + LidVector lids; + lids.push_back(lid); + removeDocuments(std::move(lids), serialNum); + } + virtual void removeDocuments(LidVector lids, SerialNum serialNum) = 0; + + /** + * Commits the document puts and removes since the last commit, + * making them searchable. This method is async, caller must + * either wait for notification about write done or sync + * indexFieldWriter executor in threading service to get sync + * behavior. + * + * @param serialNum The unique monotoninc increasing serial number + * for this operation. + * + * @param onWriteDone shared object that notifies write done when + * destructed. + **/ + virtual void commit(SerialNum serialNum, OnWriteDoneType onWriteDone) = 0; + + /** + * This method is called on a regular basis to update each component with what is the highest + * serial number for any component. This is for all components to be able to correctly tell its age. + * + * @param serialNum The serial number of the last known operation. + */ + virtual void heartBeat(SerialNum serialNum) = 0; + + /** + * This method is called when lid space is compacted. + * + * @param lidLimit The new lid limit. + * @param serialNum The serial number of the lid space compaction operation. + */ + virtual void compactLidSpace(uint32_t lidLimit, SerialNum serialNum) = 0; + + /** + * Returns the current serial number of the index. + * This should also reflect any heart beats. + * + * @return current serial number of the component. + **/ + virtual SerialNum getCurrentSerialNum() const = 0; + + /** + * Returns the serial number of the last flushed index. + * + * @return the serial number of the last flushed index. + **/ + virtual SerialNum getFlushedSerialNum() const = 0; + + /** + * Returns the searchable that will give the correct search view of the index manager. + * Normally switched everytime underlying index structures are changed in a way that can not be + * handled in a thread safe way without locking. For instance flushing of memory index or + * starting using a new schema. + * + * @return the current searchable. + **/ + virtual IndexSearchable::SP getSearchable() const = 0; + + /** + * Returns searchable stats for this index manager. + * + * @return statistics gathered about underlying memory and disk indexes. + */ + virtual search::SearchableStats getSearchableStats() const = 0; + + /** + * Returns the list of all flush targets contained in this index manager. + * + * @return The list of flushable items in this component. + **/ + virtual IFlushTarget::List getFlushTargets() = 0; + + /** + * Sets the new schema to be used by this index manager. + * + * @param schema The new schema to start using. + **/ + virtual void setSchema(const Schema &schema, SerialNum serialNum) = 0; + + /* + * Sets the max number of flushed indexes before fusion is urgent. + * + * @param maxFlushed The max number of flushed indexes before fusion is urgent. + */ + virtual void setMaxFlushed(uint32_t maxFlushed) = 0; +}; + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/imemoryindex.h b/searchcore/src/vespa/searchcorespi/index/imemoryindex.h new file mode 100644 index 00000000000..67d6e034080 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/imemoryindex.h @@ -0,0 +1,85 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchcorespi/index/indexsearchable.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/util/memoryusage.h> + +namespace vespalib { class IDestructorCallback; } +namespace document { class Document; } +namespace searchcorespi::index { + +/** + * Interface for a memory index as seen from an index maintainer. + */ +struct IMemoryIndex : public searchcorespi::IndexSearchable { + using LidVector = std::vector<uint32_t>; + using SP = std::shared_ptr<IMemoryIndex>; + using OnWriteDoneType = const std::shared_ptr<vespalib::IDestructorCallback> &; + virtual ~IMemoryIndex() {} + + /** + * Returns true if this memory index has received any document insert operations. + */ + virtual bool hasReceivedDocumentInsert() const = 0; + + /** + * Returns the memory usage of this memory index. + */ + virtual vespalib::MemoryUsage getMemoryUsage() const = 0; + + /** + * Returns the memory usage of an empty version of this memory index. + */ + virtual uint64_t getStaticMemoryFootprint() const = 0; + + /** + * Inserts the given document into this memory index. + * If the document already exists it should be removed first. + * + * @param lid the local document id. + * @param doc the document to insert. + * @param on_write_done shared object that notifies write done when destructed. + */ + virtual void insertDocument(uint32_t lid, const document::Document &doc, OnWriteDoneType on_write_done) = 0; + + /** + * Removes the given document from this memory index. + * + * @param lid the local document id. + */ + void removeDocument(uint32_t lid) { + LidVector lids; + lids.push_back(lid); + removeDocuments(std::move(lids)); + } + virtual void removeDocuments(LidVector lids) = 0; + + /** + * Commits the inserts and removes since the last commit, making them searchable. + **/ + virtual void commit(OnWriteDoneType onWriteDone, search::SerialNum serialNum) = 0; + + /** + * Flushes this memory index to disk as a disk index. + * After a flush it should be possible to load a IDiskIndex from the flush directory. + * Note that the schema used when constructing the memory index should be flushed as well + * since a IDiskIndex should be able to return the schema used by the disk index. + * + * @param flushDir the directory in which to save the flushed index. + * @param docIdLimit the largest local document id used + 1 + * @param serialNum the serial number of the last operation to the memory index. + */ + virtual void flushToDisk(const vespalib::string &flushDir, + uint32_t docIdLimit, + search::SerialNum serialNum) = 0; + + virtual void pruneRemovedFields(const search::index::Schema &schema) = 0; + virtual search::index::Schema::SP getPrunedSchema() const = 0; +}; + +} + + diff --git a/searchcore/src/vespa/searchcorespi/index/index_disk_dir.h b/searchcore/src/vespa/searchcorespi/index/index_disk_dir.h new file mode 100644 index 00000000000..335838ddf2e --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_disk_dir.h @@ -0,0 +1,36 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace searchcorespi::index { + +/* + * Class naming a disk index for a document type. + */ +class IndexDiskDir { + uint32_t _id; + bool _fusion; +public: + IndexDiskDir(uint32_t id, bool fusion) noexcept + : _id(id), + _fusion(fusion) + { + } + IndexDiskDir() noexcept + : IndexDiskDir(0, false) + { + } + bool operator<(const IndexDiskDir& rhs) const noexcept { + if (_id != rhs._id) { + return _id < rhs._id; + } + return !_fusion && rhs._fusion; + } + bool operator==(const IndexDiskDir& rhs) const noexcept { + return (_id == rhs._id) && (_fusion == rhs._fusion); + } + bool valid() const noexcept { return _id != 0u; } + bool is_fusion_index() const noexcept { return _fusion; } + uint64_t get_id() const noexcept { return _id; } +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.cpp b/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.cpp new file mode 100644 index 00000000000..ffe33d704c8 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.cpp @@ -0,0 +1,15 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "index_disk_dir_state.h" +#include <cassert> + +namespace searchcorespi::index { + +void +IndexDiskDirState::deactivate() noexcept +{ + assert(_active_count > 0u); + --_active_count; +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.h b/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.h new file mode 100644 index 00000000000..d8b790b3960 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_disk_dir_state.h @@ -0,0 +1,30 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <cstdint> +#include <optional> + +namespace searchcorespi::index { + +/* + * Class describing state for a disk index directory. + */ +class IndexDiskDirState { + uint32_t _active_count; + std::optional<uint64_t> _size_on_disk; +public: + IndexDiskDirState() + : _active_count(0), + _size_on_disk() + { + } + + void activate() noexcept { ++_active_count; } + void deactivate() noexcept; + bool is_active() const noexcept { return _active_count != 0; } + const std::optional<uint64_t>& get_size_on_disk() const noexcept { return _size_on_disk; } + void set_size_on_disk(uint64_t size_on_disk) noexcept { _size_on_disk = size_on_disk; } +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp new file mode 100644 index 00000000000..855f3c69bc9 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.cpp @@ -0,0 +1,74 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "index_manager_explorer.h" +#include "index_manager_stats.h" + +#include <vespa/vespalib/data/slime/cursor.h> + +using vespalib::slime::Cursor; +using vespalib::slime::Inserter; +using search::SearchableStats; +using searchcorespi::index::DiskIndexStats; +using searchcorespi::index::MemoryIndexStats; + +namespace searchcorespi { + +namespace { + +void +insertDiskIndex(Cursor &arrayCursor, const DiskIndexStats &diskIndex) +{ + Cursor &diskIndexCursor = arrayCursor.addObject(); + const SearchableStats &sstats = diskIndex.getSearchableStats(); + diskIndexCursor.setLong("serialNum", diskIndex.getSerialNum()); + diskIndexCursor.setString("indexDir", diskIndex.getIndexdir()); + diskIndexCursor.setLong("sizeOnDisk", sstats.sizeOnDisk()); +} + +void +insertMemoryUsage(Cursor &object, const vespalib::MemoryUsage &usage) +{ + Cursor &memory = object.setObject("memoryUsage"); + memory.setLong("allocatedBytes", usage.allocatedBytes()); + memory.setLong("usedBytes", usage.usedBytes()); + memory.setLong("deadBytes", usage.deadBytes()); + memory.setLong("onHoldBytes", usage.allocatedBytesOnHold()); +} + +void +insertMemoryIndex(Cursor &arrayCursor, const MemoryIndexStats &memoryIndex) +{ + Cursor &memoryIndexCursor = arrayCursor.addObject(); + const SearchableStats &sstats = memoryIndex.getSearchableStats(); + memoryIndexCursor.setLong("serialNum", memoryIndex.getSerialNum()); + memoryIndexCursor.setLong("docsInMemory", sstats.docsInMemory()); + insertMemoryUsage(memoryIndexCursor, sstats.memoryUsage()); +} + +} + + +IndexManagerExplorer::IndexManagerExplorer(IIndexManager::SP mgr) + : _mgr(std::move(mgr)) +{ +} + +void +IndexManagerExplorer::get_state(const Inserter &inserter, bool full) const +{ + Cursor &object = inserter.insertObject(); + object.setLong("lastSerialNum", _mgr->getCurrentSerialNum()); + if (full) { + IndexManagerStats stats(*_mgr); + Cursor &diskIndexArrayCursor = object.setArray("diskIndexes"); + for (const auto &diskIndex : stats.getDiskIndexes()) { + insertDiskIndex(diskIndexArrayCursor, diskIndex); + } + Cursor &memoryIndexArrayCursor = object.setArray("memoryIndexes"); + for (const auto &memoryIndex : stats.getMemoryIndexes()) { + insertMemoryIndex(memoryIndexArrayCursor, memoryIndex); + } + } +} + +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.h b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.h new file mode 100644 index 00000000000..3e52199eeda --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_manager_explorer.h @@ -0,0 +1,25 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "iindexmanager.h" +#include <vespa/vespalib/net/state_explorer.h> + +namespace searchcorespi { + +/** + * Class used to explore the state of an index manager. + */ +class IndexManagerExplorer : public vespalib::StateExplorer +{ +private: + IIndexManager::SP _mgr; + +public: + IndexManagerExplorer(IIndexManager::SP mgr); + + // Implements vespalib::StateExplorer + virtual void get_state(const vespalib::slime::Inserter &inserter, bool full) const override; +}; + +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_stats.cpp b/searchcore/src/vespa/searchcorespi/index/index_manager_stats.cpp new file mode 100644 index 00000000000..a93934c1500 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_manager_stats.cpp @@ -0,0 +1,58 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "index_manager_stats.h" +#include "iindexmanager.h" +#include "indexsearchablevisitor.h" +#include "imemoryindex.h" + +namespace searchcorespi { + +namespace { + +class Visitor : public IndexSearchableVisitor +{ +public: + std::vector<index::DiskIndexStats> _diskIndexes; + std::vector<index::MemoryIndexStats> _memoryIndexes; + + Visitor(); + ~Visitor(); + void visit(const index::IDiskIndex &index) override { + _diskIndexes.emplace_back(index); + } + void visit(const index::IMemoryIndex &index) override { + _memoryIndexes.emplace_back(index); + } + + void normalize() { + std::sort(_diskIndexes.begin(), _diskIndexes.end()); + std::sort(_memoryIndexes.begin(), _memoryIndexes.end()); + } +}; + +Visitor::Visitor() = default; +Visitor::~Visitor() = default; + +} + +IndexManagerStats::IndexManagerStats() + : _diskIndexes(), + _memoryIndexes() +{ +} + +IndexManagerStats::IndexManagerStats(const IIndexManager &indexManager) + : _diskIndexes(), + _memoryIndexes() +{ + Visitor visitor; + IndexSearchable::SP searchable(indexManager.getSearchable()); + searchable->accept(visitor); + visitor.normalize(); + _diskIndexes = std::move(visitor._diskIndexes); + _memoryIndexes = std::move(visitor._memoryIndexes); +} + +IndexManagerStats::~IndexManagerStats() = default; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/index_manager_stats.h b/searchcore/src/vespa/searchcorespi/index/index_manager_stats.h new file mode 100644 index 00000000000..1e218a62660 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_manager_stats.h @@ -0,0 +1,31 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "disk_index_stats.h" +#include "memory_index_stats.h" +#include <vector> + +namespace searchcorespi { + +class IIndexManager; + +/** + * Information about an index manager usable by state explorer. + */ +class IndexManagerStats { + std::vector<index::DiskIndexStats> _diskIndexes; + std::vector<index::MemoryIndexStats> _memoryIndexes; +public: + IndexManagerStats(); + IndexManagerStats(const IIndexManager &indexManager); + ~IndexManagerStats(); + + const std::vector<index::DiskIndexStats> &getDiskIndexes() const { + return _diskIndexes; + } + const std::vector<index::MemoryIndexStats> &getMemoryIndexes() const { + return _memoryIndexes; + } +}; + +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.cpp b/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.cpp new file mode 100644 index 00000000000..92de7d2d292 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.cpp @@ -0,0 +1,26 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "index_searchable_stats.h" +#include "indexsearchable.h" + + +namespace searchcorespi::index { + +IndexSearchableStats::IndexSearchableStats() + : _serialNum(0), + _searchableStats() +{ +} + +IndexSearchableStats::IndexSearchableStats(const IndexSearchable &index) + : _serialNum(index.getSerialNum()), + _searchableStats(index.getSearchableStats()) +{ +} + +bool IndexSearchableStats::operator<(const IndexSearchableStats &rhs) const +{ + return _serialNum < rhs._serialNum; +} + +} // namespace searchcorespi::index diff --git a/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.h b/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.h new file mode 100644 index 00000000000..a61245ddb5d --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/index_searchable_stats.h @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchlib/util/searchable_stats.h> + +namespace searchcorespi { class IndexSearchable; } + +namespace searchcorespi::index { + +/** + * Information about a searchable index usable by state explorer. + */ +class IndexSearchableStats +{ + using SerialNum = search::SerialNum; + using SearchableStats = search::SearchableStats; + SerialNum _serialNum; + SearchableStats _searchableStats; +public: + IndexSearchableStats(); + IndexSearchableStats(const IndexSearchable &index); + bool operator<(const IndexSearchableStats &rhs) const; + SerialNum getSerialNum() const { return _serialNum; } + const SearchableStats &getSearchableStats() const { return _searchableStats; } +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexcollection.cpp b/searchcore/src/vespa/searchcorespi/index/indexcollection.cpp new file mode 100644 index 00000000000..d69f7d1b0a4 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexcollection.cpp @@ -0,0 +1,253 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "indexcollection.h" +#include "indexsearchablevisitor.h" +#include <vespa/searchlib/queryeval/isourceselector.h> +#include <vespa/searchlib/queryeval/create_blueprint_visitor_helper.h> +#include <vespa/searchlib/queryeval/intermediate_blueprints.h> +#include <vespa/searchlib/queryeval/leaf_blueprints.h> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexcollection"); + +using namespace search::queryeval; +using namespace search::query; +using search::attribute::IAttributeContext; +using search::index::FieldLengthInfo; + +namespace searchcorespi { + +IndexCollection::IndexCollection(const ISourceSelector::SP & selector) + : _source_selector(selector), + _sources() +{ +} + +IndexCollection::IndexCollection(const ISourceSelector::SP & selector, + const ISearchableIndexCollection &sources) + : _source_selector(selector), + _sources() +{ + for (size_t i(0), m(sources.getSourceCount()); i < m; i++) { + append(sources.getSourceId(i), sources.getSearchableSP(i)); + } + setCurrentIndex(sources.getCurrentIndex()); +} + +IndexCollection::~IndexCollection() = default; + +void +IndexCollection::setSource(uint32_t docId) +{ + assert( valid() ); + _source_selector->setSource(docId, getCurrentIndex()); +} + +ISearchableIndexCollection::UP +IndexCollection::replaceAndRenumber(const ISourceSelector::SP & selector, + const ISearchableIndexCollection &fsc, + uint32_t id_diff, + const IndexSearchable::SP &new_source) +{ + auto new_fsc = std::make_unique<IndexCollection>(selector); + new_fsc->append(0, new_source); + for (size_t i = 0; i < fsc.getSourceCount(); ++i) { + if (fsc.getSourceId(i) > id_diff) { + new_fsc->append(fsc.getSourceId(i) - id_diff, fsc.getSearchableSP(i)); + } + } + return new_fsc; +} + +void +IndexCollection::append(uint32_t id, const IndexSearchable::SP &fs) +{ + _sources.push_back(SourceWithId(id, fs)); +} + +IndexSearchable::SP +IndexCollection::getSearchableSP(uint32_t i) const +{ + return _sources[i].source_wrapper; +} + +void +IndexCollection::replace(uint32_t id, const IndexSearchable::SP &fs) +{ + for (size_t i = 0; i < _sources.size(); ++i) { + if (_sources[i].id == id) { + _sources[i].source_wrapper = fs; + return; + } + } + LOG(warning, "Tried to replace Searchable %d, but it wasn't there.", id); + append(id, fs); +} + +const ISourceSelector & +IndexCollection::getSourceSelector() const +{ + return *_source_selector; +} + +size_t +IndexCollection::getSourceCount() const +{ + return _sources.size(); +} + +IndexSearchable & +IndexCollection::getSearchable(uint32_t i) const +{ + return *_sources[i].source_wrapper; +} + +uint32_t +IndexCollection::getSourceId(uint32_t i) const +{ + return _sources[i].id; +} + +search::SearchableStats +IndexCollection::getSearchableStats() const +{ + search::SearchableStats stats; + for (size_t i = 0; i < _sources.size(); ++i) { + stats.merge(_sources[i].source_wrapper->getSearchableStats()); + } + return stats; +} + +search::SerialNum +IndexCollection::getSerialNum() const +{ + search::SerialNum serialNum = 0; + for (auto &source : _sources) { + serialNum = std::max(serialNum, source.source_wrapper->getSerialNum()); + } + return serialNum; +} + + +void +IndexCollection::accept(IndexSearchableVisitor &visitor) const +{ + for (auto &source : _sources) { + source.source_wrapper->accept(visitor); + } +} + +namespace { + +struct Mixer { + const ISourceSelector &_selector; + std::unique_ptr<SourceBlenderBlueprint> _blender; + + Mixer(const ISourceSelector &selector) + : _selector(selector), _blender() {} + + void addIndex(Blueprint::UP index) { + if ( ! _blender) { + _blender = std::make_unique<SourceBlenderBlueprint>(_selector); + } + _blender->addChild(std::move(index)); + } + + Blueprint::UP mix() { + if (_blender) { + return std::move(_blender); + } + return std::make_unique<EmptyBlueprint>(); + } +}; + +class CreateBlueprintVisitor : public search::query::QueryVisitor { +private: + const IIndexCollection &_indexes; + const FieldSpecList &_fields; + const IRequestContext &_requestContext; + Blueprint::UP _result; + + template <typename NodeType> + void visitTerm(NodeType &n) { + Mixer mixer(_indexes.getSourceSelector()); + for (size_t i = 0; i < _indexes.getSourceCount(); ++i) { + Blueprint::UP blueprint = _indexes.getSearchable(i).createBlueprint(_requestContext, _fields, n); + blueprint->setSourceId(_indexes.getSourceId(i)); + mixer.addIndex(std::move(blueprint)); + } + _result = mixer.mix(); + } + + void visit(And &) override { } + void visit(AndNot &) override { } + void visit(Or &) override { } + void visit(WeakAnd &) override { } + void visit(Equiv &) override { } + void visit(Rank &) override { } + void visit(Near &) override { } + void visit(ONear &) override { } + void visit(SameElement &) override { } + void visit(TrueQueryNode &) override {} + void visit(FalseQueryNode &) override {} + + void visit(WeightedSetTerm &n) override { visitTerm(n); } + void visit(DotProduct &n) override { visitTerm(n); } + void visit(WandTerm &n) override { visitTerm(n); } + void visit(Phrase &n) override { visitTerm(n); } + void visit(NumberTerm &n) override { visitTerm(n); } + void visit(LocationTerm &n) override { visitTerm(n); } + void visit(PrefixTerm &n) override { visitTerm(n); } + void visit(RangeTerm &n) override { visitTerm(n); } + void visit(StringTerm &n) override { visitTerm(n); } + void visit(SubstringTerm &n) override { visitTerm(n); } + void visit(SuffixTerm &n) override { visitTerm(n); } + void visit(PredicateQuery &n) override { visitTerm(n); } + void visit(RegExpTerm &n) override { visitTerm(n); } + void visit(NearestNeighborTerm &n) override { visitTerm(n); } + void visit(FuzzyTerm &n) override { visitTerm(n); } + +public: + CreateBlueprintVisitor(const IIndexCollection &indexes, + const FieldSpecList &fields, + const IRequestContext & requestContext) + : _indexes(indexes), + _fields(fields), + _requestContext(requestContext), + _result() {} + + Blueprint::UP getResult() { return std::move(_result); } +}; + +} + +Blueprint::UP +IndexCollection::createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term) +{ + FieldSpecList fields; + fields.add(field); + return createBlueprint(requestContext, fields, term); +} + +Blueprint::UP +IndexCollection::createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term) +{ + CreateBlueprintVisitor visitor(*this, fields, requestContext); + const_cast<Node &>(term).accept(visitor); + return visitor.getResult(); +} + +FieldLengthInfo +IndexCollection::get_field_length_info(const vespalib::string& field_name) const +{ + if (_sources.empty()) { + return FieldLengthInfo(); + } + return _sources.back().source_wrapper->get_field_length_info(field_name); +} + +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/indexcollection.h b/searchcore/src/vespa/searchcorespi/index/indexcollection.h new file mode 100644 index 00000000000..d9fb8e973e1 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexcollection.h @@ -0,0 +1,69 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "isearchableindexcollection.h" +#include <vespa/searchlib/util/searchable_stats.h> + +namespace searchcorespi { + +/** + * Holds a set of index searchables with source ids, and a source selector for + * determining which index to use for each document. + */ +class IndexCollection : public ISearchableIndexCollection +{ + struct SourceWithId { + uint32_t id; + IndexSearchable::SP source_wrapper; + + SourceWithId(uint32_t id_in, const IndexSearchable::SP &source_in) + : id(id_in), source_wrapper(source_in) + {} + SourceWithId() : id(0), source_wrapper() {} + }; + + // Selector shared across memory dumps, replaced on disk fusion operations + using ISourceSelectorSP = std::shared_ptr<ISourceSelector>; + ISourceSelectorSP _source_selector; + std::vector<SourceWithId> _sources; + +public: + IndexCollection(const ISourceSelectorSP & selector); + IndexCollection(const ISourceSelectorSP & selector, const ISearchableIndexCollection &sources); + ~IndexCollection(); + + void append(uint32_t id, const IndexSearchable::SP &source) override; + void replace(uint32_t id, const IndexSearchable::SP &source) override; + IndexSearchable::SP getSearchableSP(uint32_t i) const override; + void setSource(uint32_t docId) override; + + + // Implements IIndexCollection + const ISourceSelector &getSourceSelector() const override; + size_t getSourceCount() const override; + IndexSearchable &getSearchable(uint32_t i) const override; + uint32_t getSourceId(uint32_t i) const override; + + // Implements IndexSearchable + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, const FieldSpec &field, const Node &term) override; + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, const FieldSpecList &fields, const Node &term) override; + search::SearchableStats getSearchableStats() const override; + search::SerialNum getSerialNum() const override; + void accept(IndexSearchableVisitor &visitor) const override; + + static ISearchableIndexCollection::UP + replaceAndRenumber(const ISourceSelectorSP & selector, const ISearchableIndexCollection &fsc, + uint32_t id_diff, const IndexSearchable::SP &new_source); + + // Implements IFieldLengthInspector + /** + * Returns field length info from the newest disk index, or empty info for all fields if no disk index exists. + */ + search::index::FieldLengthInfo get_field_length_info(const vespalib::string& field_name) const override; +}; + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/indexdisklayout.cpp b/searchcore/src/vespa/searchcorespi/index/indexdisklayout.cpp new file mode 100644 index 00000000000..c701d1dfb1d --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexdisklayout.cpp @@ -0,0 +1,77 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "indexdisklayout.h" +#include "index_disk_dir.h" +#include <sstream> + +namespace searchcorespi::index { + +const vespalib::string +IndexDiskLayout::FlushDirPrefix = vespalib::string("index.flush."); + +const vespalib::string +IndexDiskLayout::FusionDirPrefix = vespalib::string("index.fusion."); + +const vespalib::string +IndexDiskLayout::SerialNumTag = vespalib::string("Serial num"); + +IndexDiskLayout::IndexDiskLayout(const vespalib::string &baseDir) + : _baseDir(baseDir) +{ +} + +vespalib::string +IndexDiskLayout::getFlushDir(uint32_t sourceId) const +{ + std::ostringstream ost; + ost << _baseDir << "/" << FlushDirPrefix << sourceId; + return ost.str(); +} + +vespalib::string +IndexDiskLayout::getFusionDir(uint32_t sourceId) const +{ + std::ostringstream ost; + ost << _baseDir << "/" << FusionDirPrefix << sourceId; + return ost.str(); +} + +vespalib::string +IndexDiskLayout::getSerialNumFileName(const vespalib::string &dir) +{ + return dir + "/serial.dat"; +} + +vespalib::string +IndexDiskLayout::getSchemaFileName(const vespalib::string &dir) +{ + return dir + "/schema.txt"; +} + +vespalib::string +IndexDiskLayout::getSelectorFileName(const vespalib::string &dir) +{ + return dir + "/selector"; +} + +IndexDiskDir +IndexDiskLayout::get_index_disk_dir(const vespalib::string& dir) +{ + auto name = dir.substr(dir.rfind('/') + 1); + const vespalib::string* prefix = nullptr; + bool fusion = false; + if (name.find(FlushDirPrefix) == 0) { + prefix = &FlushDirPrefix; + } else if (name.find(FusionDirPrefix) == 0) { + prefix = &FusionDirPrefix; + fusion = true; + } else { + return IndexDiskDir(); // invalid + } + std::istringstream ist(name.substr(prefix->size())); + uint32_t id = 0; + ist >> id; + return IndexDiskDir(id, fusion); // invalid if id == 0u +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexdisklayout.h b/searchcore/src/vespa/searchcorespi/index/indexdisklayout.h new file mode 100644 index 00000000000..94b35936cc7 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexdisklayout.h @@ -0,0 +1,38 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { + +class IndexDiskDir; + +/** + * Utility class used to get static aspects of the disk layout (i.e directory and file names) + * needed by the index maintainer. + */ +class IndexDiskLayout { +public: + static const vespalib::string FlushDirPrefix; + static const vespalib::string FusionDirPrefix; + static const vespalib::string SerialNumTag; + +private: + vespalib::string _baseDir; + +public: + IndexDiskLayout(const vespalib::string &baseDir); + vespalib::string getFlushDir(uint32_t sourceId) const; + vespalib::string getFusionDir(uint32_t sourceId) const; + static IndexDiskDir get_index_disk_dir(const vespalib::string& dir); + + static vespalib::string getSerialNumFileName(const vespalib::string &dir); + static vespalib::string getSchemaFileName(const vespalib::string &dir); + static vespalib::string getSelectorFileName(const vespalib::string &dir); +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp new file mode 100644 index 00000000000..e72525d0aaa --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -0,0 +1,83 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "indexflushtarget.h" +#include <vespa/vespalib/util/size_literals.h> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexflushtarget"); + +namespace searchcorespi::index { + +IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer, IndexMaintainer::FlushStats flushStats) + : IFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX), + _indexMaintainer(indexMaintainer), + _flushStats(flushStats), + _numFrozenMemoryIndexes(indexMaintainer.getNumFrozenMemoryIndexes()), + _maxFrozenMemoryIndexes(indexMaintainer.getMaxFrozenMemoryIndexes()), + _lastStats() +{ + _lastStats.setPathElementsToLog(7); +} + +IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer) + : IndexFlushTarget(indexMaintainer, indexMaintainer.getFlushStats()) +{} + +IndexFlushTarget::~IndexFlushTarget() = default; + +IFlushTarget::MemoryGain +IndexFlushTarget::getApproxMemoryGain() const +{ + return MemoryGain(_flushStats.memory_before_bytes, _flushStats.memory_after_bytes); +} + +IFlushTarget::DiskGain +IndexFlushTarget::getApproxDiskGain() const +{ + return DiskGain(0, 0); +} + +bool +IndexFlushTarget::needUrgentFlush() const +{ + // Due to limitation of 16G address space of single datastore + // TODO: Even better if urgency was decided by memory index itself. + bool urgent = (_numFrozenMemoryIndexes > _maxFrozenMemoryIndexes) || + (getApproxMemoryGain().gain() > ssize_t(16_Gi)); + SerialNum flushedSerial = _indexMaintainer.getFlushedSerialNum(); + LOG(debug, "Num frozen: %u Memory gain: %" PRId64 " Urgent: %d, flushedSerial=%" PRIu64, + _numFrozenMemoryIndexes, getApproxMemoryGain().gain(), static_cast<int>(urgent), flushedSerial); + return urgent; +} + +IFlushTarget::Time +IndexFlushTarget::getLastFlushTime() const +{ + return _indexMaintainer.getLastFlushTime(); +} + +IFlushTarget::SerialNum +IndexFlushTarget::getFlushedSerialNum() const +{ + return _indexMaintainer.getFlushedSerialNum(); +} + +IFlushTarget::Task::UP +IndexFlushTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken>) +{ + // the target must live until this task is done (handled by flush engine). + return _indexMaintainer.initFlush(serialNum, &_lastStats); +} + +uint64_t +IndexFlushTarget::getApproxBytesToWriteToDisk() const +{ + MemoryGain gain(_flushStats.memory_before_bytes, _flushStats.memory_after_bytes); + if (gain.getAfter() < gain.getBefore()) { + return gain.getBefore() - gain.getAfter(); + } else { + return 0; + } +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h new file mode 100644 index 00000000000..2b9ecc9574b --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexflushtarget.h @@ -0,0 +1,38 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "indexmaintainer.h" +#include <vespa/searchcorespi/flush/iflushtarget.h> + +namespace searchcorespi::index { + +/** + * Flush target for flushing a memory index in an IndexMaintainer. + **/ +class IndexFlushTarget : public IFlushTarget { +private: + IndexMaintainer &_indexMaintainer; + const IndexMaintainer::FlushStats _flushStats; + uint32_t _numFrozenMemoryIndexes; + uint32_t _maxFrozenMemoryIndexes; + FlushStats _lastStats; + +public: + explicit IndexFlushTarget(IndexMaintainer &indexMaintainer); + IndexFlushTarget(IndexMaintainer &indexMaintainer, IndexMaintainer::FlushStats flushStats); + ~IndexFlushTarget() override; + + // Implements IFlushTarget + MemoryGain getApproxMemoryGain() const override; + DiskGain getApproxDiskGain() const override; + SerialNum getFlushedSerialNum() const override; + Time getLastFlushTime() const override; + + bool needUrgentFlush() const override; + + Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; + FlushStats getLastFlushStats() const override { return _lastStats; } + uint64_t getApproxBytesToWriteToDisk() const override; +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp new file mode 100644 index 00000000000..1df6d321f99 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -0,0 +1,102 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "indexfusiontarget.h" + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexfusiontarget"); + +namespace searchcorespi::index { + +using search::SerialNum; +namespace { + +class Fusioner : public FlushTask { +private: + IndexMaintainer &_indexMaintainer; + FlushStats &_stats; + SerialNum _serialNum; + std::shared_ptr<search::IFlushToken> _flush_token; +public: + Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) : + _indexMaintainer(indexMaintainer), + _stats(stats), + _serialNum(serialNum), + _flush_token(std::move(flush_token)) + {} + + void run() override { + vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum, _flush_token); + // the target must live until this task is done (handled by flush engine). + _stats.setPath(outputFusionDir); + } + + SerialNum getFlushSerial() const override { + return 0u; // Zero means that no tls syncing is needed + } +}; + +} +IndexFusionTarget::IndexFusionTarget(IndexMaintainer &indexMaintainer) + : IFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX), + _indexMaintainer(indexMaintainer), + _fusionStats(indexMaintainer.getFusionStats()), + _lastStats() +{ + _lastStats.setPathElementsToLog(7); + LOG(debug, "New target, Num flushed: %d, Disk usage: %" PRIu64, _fusionStats.numUnfused, _fusionStats.diskUsage); +} + +IndexFusionTarget::~IndexFusionTarget() = default; + +IFlushTarget::MemoryGain +IndexFusionTarget::getApproxMemoryGain() const +{ + return MemoryGain(0, 0); +} + +IFlushTarget::DiskGain +IndexFusionTarget::getApproxDiskGain() const +{ + uint64_t diskUsageBefore = _fusionStats.diskUsage; + uint64_t diskUsageGain = static_cast<uint64_t>((0.1 * (diskUsageBefore * std::max(0,static_cast<int>(_fusionStats.numUnfused - 1))))); + diskUsageGain = std::min(diskUsageGain, diskUsageBefore); + if (!_fusionStats._canRunFusion) + diskUsageGain = 0; + return DiskGain(diskUsageBefore, diskUsageBefore - diskUsageGain); +} + +bool +IndexFusionTarget::needUrgentFlush() const +{ + bool urgent = (_fusionStats.numUnfused > _fusionStats.maxFlushed) && (_fusionStats._canRunFusion); + LOG(debug, "Num flushed: %d Urgent: %d", _fusionStats.numUnfused, urgent); + return urgent; +} + +IFlushTarget::Time +IndexFusionTarget::getLastFlushTime() const +{ + return vespalib::system_clock::now(); +} + +IFlushTarget::SerialNum +IndexFusionTarget::getFlushedSerialNum() const +{ + // Lack of fusion operation doesn't prevent transaction log + // pruning. + return _indexMaintainer.getCurrentSerialNum(); +} + +IFlushTarget::Task::UP +IndexFusionTarget::initFlush(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) +{ + return std::make_unique<Fusioner>(_indexMaintainer, _lastStats, serialNum, std::move(flush_token)); +} + +uint64_t +IndexFusionTarget::getApproxBytesToWriteToDisk() const +{ + return _fusionStats.diskUsage; +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h new file mode 100644 index 00000000000..7a9f44e6612 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexfusiontarget.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "indexmaintainer.h" +#include <vespa/searchcorespi/flush/iflushtarget.h> + +namespace searchcorespi::index { + +/** + * Flush target for doing fusion on disk indexes in an IndexMaintainer. + **/ +class IndexFusionTarget : public IFlushTarget { +private: + IndexMaintainer &_indexMaintainer; + IndexMaintainer::FusionStats _fusionStats; + FlushStats _lastStats; + +public: + IndexFusionTarget(IndexMaintainer &indexMaintainer); + ~IndexFusionTarget() override; + + // Implements IFlushTarget + MemoryGain getApproxMemoryGain() const override; + DiskGain getApproxDiskGain() const override; + SerialNum getFlushedSerialNum() const override; + Time getLastFlushTime() const override; + bool needUrgentFlush() const override; + + Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override; + FlushStats getLastFlushStats() const override { return _lastStats; } + uint64_t getApproxBytesToWriteToDisk() const override; +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp new file mode 100644 index 00000000000..af273bc5e45 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -0,0 +1,1363 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "diskindexcleaner.h" +#include "eventlogger.h" +#include "fusionrunner.h" +#include "indexflushtarget.h" +#include "indexfusiontarget.h" +#include "indexmaintainer.h" +#include "indexreadutilities.h" +#include "indexwriteutilities.h" +#include "index_disk_dir.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/searchcorespi/flush/lambdaflushtask.h> +#include <vespa/searchlib/common/i_flush_token.h> +#include <vespa/searchlib/index/schemautil.h> +#include <vespa/searchlib/util/dirtraverse.h> +#include <vespa/searchlib/util/filekit.h> +#include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/array.hpp> +#include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/gate.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/time.h> +#include <vespa/fastos/file.h> +#include <sstream> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexmaintainer"); + +using document::Document; +using search::FixedSourceSelector; +using search::TuneFileAttributes; +using search::index::Schema; +using search::index::SchemaUtil; +using search::common::FileHeaderContext; +using search::queryeval::ISourceSelector; +using search::queryeval::Source; +using search::SerialNum; +using vespalib::makeLambdaTask; +using vespalib::makeSharedLambdaCallback; +using std::ostringstream; +using vespalib::string; +using vespalib::Executor; +using vespalib::Runnable; +using vespalib::IDestructorCallback; + +namespace searchcorespi::index { + +using Configure = IIndexManager::Configure; +using Reconfigurer = IIndexManager::Reconfigurer; + +namespace { + +class ReconfigRunnable : public Runnable { +public: + bool &_result; + Reconfigurer &_reconfigurer; + std::unique_ptr<Configure> _configure; + + ReconfigRunnable(bool &result, Reconfigurer &reconfigurer, std::unique_ptr<Configure> configure) + : _result(result), + _reconfigurer(reconfigurer), + _configure(std::move(configure)) + { } + + void run() override { + _result = _reconfigurer.reconfigure(std::move(_configure)); + } +}; + +class ReconfigRunnableTask : public Executor::Task { +private: + Reconfigurer &_reconfigurer; + std::unique_ptr<Configure> _configure; +public: + ReconfigRunnableTask(Reconfigurer &reconfigurer, std::unique_ptr<Configure> configure) + : _reconfigurer(reconfigurer), + _configure(std::move(configure)) + { } + ~ReconfigRunnableTask() override; + void run() override { + _reconfigurer.reconfigure(std::move(_configure)); + } +}; + +ReconfigRunnableTask::~ReconfigRunnableTask() = default; + +SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max(); + + +class DiskIndexWithDestructorCallback : public IDiskIndex { +private: + std::shared_ptr<IDestructorCallback> _callback; + IDiskIndex::SP _index; + IndexDiskDir _index_disk_dir; + IndexDiskLayout& _layout; + DiskIndexes& _disk_indexes; + +public: + DiskIndexWithDestructorCallback(IDiskIndex::SP index, + std::shared_ptr<IDestructorCallback> callback, + IndexDiskLayout& layout, + DiskIndexes& disk_indexes) noexcept + : _callback(std::move(callback)), + _index(std::move(index)), + _index_disk_dir(IndexDiskLayout::get_index_disk_dir(_index->getIndexDir())), + _layout(layout), + _disk_indexes(disk_indexes) + { + } + ~DiskIndexWithDestructorCallback() override; + const IDiskIndex &getWrapped() const { return *_index; } + + /** + * Implements searchcorespi::IndexSearchable + */ + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term) override + { + FieldSpecList fsl; + fsl.add(field); + return _index->createBlueprint(requestContext, fsl, term); + } + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term) override + { + return _index->createBlueprint(requestContext, fields, term); + } + search::SearchableStats getSearchableStats() const override; + search::SerialNum getSerialNum() const override { + return _index->getSerialNum(); + } + void accept(IndexSearchableVisitor &visitor) const override { + _index->accept(visitor); + } + + /** + * Implements IFieldLengthInspector + */ + search::index::FieldLengthInfo get_field_length_info(const vespalib::string& field_name) const override { + return _index->get_field_length_info(field_name); + } + + /** + * Implements IDiskIndex + */ + const vespalib::string &getIndexDir() const override { return _index->getIndexDir(); } + const search::index::Schema &getSchema() const override { return _index->getSchema(); } + +}; + +DiskIndexWithDestructorCallback::~DiskIndexWithDestructorCallback() = default; + +search::SearchableStats +DiskIndexWithDestructorCallback::getSearchableStats() const +{ + auto stats = _index->getSearchableStats(); + uint64_t transient_size = _disk_indexes.get_transient_size(_layout, _index_disk_dir); + stats.fusion_size_on_disk(transient_size); + return stats; +} + + +} // namespace + +IndexMaintainer::FrozenMemoryIndexRef::~FrozenMemoryIndexRef() = default; + +IndexMaintainer::FusionArgs::FusionArgs() + : _new_fusion_id(0u), + _changeGens(), + _schema(), + _prunedSchema(), + _old_source_list() +{ } + +IndexMaintainer::FusionArgs::~FusionArgs() = default; + +IndexMaintainer::SetSchemaArgs::SetSchemaArgs() = default; +IndexMaintainer::SetSchemaArgs::~SetSchemaArgs() = default; + +uint32_t +IndexMaintainer::getNewAbsoluteId() +{ + return _next_id++; +} + +string +IndexMaintainer::getFlushDir(uint32_t sourceId) const +{ + return _layout.getFlushDir(sourceId); +} + +string +IndexMaintainer::getFusionDir(uint32_t sourceId) const +{ + return _layout.getFusionDir(sourceId); +} + +bool +IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll) +{ + bool hasReopenedAnything(false); + assert(_ctx.getThreadingService().master().isCurrentThread()); + uint32_t count = coll.getSourceCount(); + for (uint32_t i = 0; i < count; ++i) { + IndexSearchable &is = coll.getSearchable(i); + const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is); + if (d == nullptr) { + continue; // not a disk index + } + const string indexDir = d->getIndexDir(); + vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir); + Schema trimmedSchema; + if (!trimmedSchema.loadFromFile(schemaName)) { + LOG(error, "Could not open schema '%s'", schemaName.c_str()); + } + if (trimmedSchema != d->getSchema()) { + IDiskIndex::SP newIndex(reloadDiskIndex(*d)); + coll.replace(coll.getSourceId(i), newIndex); + hasReopenedAnything = true; + } + } + return hasReopenedAnything; +} + +void +IndexMaintainer::updateDiskIndexSchema(const vespalib::string &indexDir, + const Schema &schema, + SerialNum serialNum) +{ + // Called by a flush worker thread OR document db executor thread + LockGuard lock(_schemaUpdateLock); + IndexWriteUtilities::updateDiskIndexSchema(indexDir, schema, serialNum); +} + +void +IndexMaintainer::updateIndexSchemas(IIndexCollection &coll, + const Schema &schema, + SerialNum serialNum) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + uint32_t count = coll.getSourceCount(); + for (uint32_t i = 0; i < count; ++i) { + IndexSearchable &is = coll.getSearchable(i); + const auto *const d = dynamic_cast<const DiskIndexWithDestructorCallback *>(&is); + if (d == nullptr) { + IMemoryIndex *const m = dynamic_cast<IMemoryIndex *>(&is); + if (m != nullptr) { + m->pruneRemovedFields(schema); + } + continue; + } + updateDiskIndexSchema(d->getIndexDir(), schema, serialNum); + } +} + +void +IndexMaintainer::updateActiveFusionPrunedSchema(const Schema &schema) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + for (;;) { + Schema::SP activeFusionSchema; + Schema::SP activeFusionPrunedSchema; + Schema::SP newActiveFusionPrunedSchema; + { + LockGuard lock(_state_lock); + activeFusionSchema = _activeFusionSchema; + activeFusionPrunedSchema = _activeFusionPrunedSchema; + } + if (!activeFusionSchema) + return; // No active fusion + if (!activeFusionPrunedSchema) { + Schema::UP newSchema = Schema::intersect(*activeFusionSchema, schema); + newActiveFusionPrunedSchema = std::move(newSchema); + } else { + Schema::UP newSchema = Schema::intersect(*activeFusionPrunedSchema, schema); + newActiveFusionPrunedSchema = std::move(newSchema); + } + { + LockGuard slock(_state_lock); + LockGuard ilock(_index_update_lock); + if (activeFusionSchema == _activeFusionSchema && + activeFusionPrunedSchema == _activeFusionPrunedSchema) + { + _activeFusionPrunedSchema = newActiveFusionPrunedSchema; + break; + } + } + } +} + +void +IndexMaintainer::deactivateDiskIndexes(vespalib::string indexDir) +{ + _disk_indexes->notActive(indexDir); + removeOldDiskIndexes(); +} + +IDiskIndex::SP +IndexMaintainer::loadDiskIndex(const string &indexDir) +{ + // Called by a flush worker thread OR CTOR (in document db init executor thread) + if (LOG_WOULD_LOG(event)) { + EventLogger::diskIndexLoadStart(indexDir); + } + vespalib::Timer timer; + auto index = _operations.loadDiskIndex(indexDir); + auto stats = index->getSearchableStats(); + _disk_indexes->setActive(indexDir, stats.sizeOnDisk()); + auto retval = std::make_shared<DiskIndexWithDestructorCallback>( + std::move(index), + makeSharedLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }), + _layout, *_disk_indexes); + if (LOG_WOULD_LOG(event)) { + EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed())); + } + return retval; +} + +IDiskIndex::SP +IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex) +{ + // Called by a flush worker thread OR document db executor thread + const string indexDir = oldIndex.getIndexDir(); + if (LOG_WOULD_LOG(event)) { + EventLogger::diskIndexLoadStart(indexDir); + } + vespalib::Timer timer; + const IDiskIndex &wrappedDiskIndex = (dynamic_cast<const DiskIndexWithDestructorCallback &>(oldIndex)).getWrapped(); + auto index = _operations.reloadDiskIndex(wrappedDiskIndex); + auto stats = index->getSearchableStats(); + _disk_indexes->setActive(indexDir, stats.sizeOnDisk()); + auto retval = std::make_shared<DiskIndexWithDestructorCallback>( + std::move(index), + makeSharedLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }), + _layout, *_disk_indexes); + if (LOG_WOULD_LOG(event)) { + EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed())); + } + return retval; +} + +IDiskIndex::SP +IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex, + uint32_t indexId, + uint32_t docIdLimit, + SerialNum serialNum, + FixedSourceSelector::SaveInfo &saveInfo) +{ + // Called by a flush worker thread + const string flushDir = getFlushDir(indexId); + memoryIndex.flushToDisk(flushDir, docIdLimit, serialNum); + Schema::SP prunedSchema(memoryIndex.getPrunedSchema()); + if (prunedSchema) { + updateDiskIndexSchema(flushDir, *prunedSchema, noSerialNumHigh); + } + IndexWriteUtilities::writeSourceSelector(saveInfo, indexId, getAttrTune(), + _ctx.getFileHeaderContext(), serialNum); + IndexWriteUtilities::writeSerialNum(serialNum, flushDir, _ctx.getFileHeaderContext()); + return loadDiskIndex(flushDir); +} + +ISearchableIndexCollection::UP +IndexMaintainer::loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList) +{ + // Called by CTOR (in document db init executor thread) + uint32_t fusion_id = spec.last_fusion_id; + if (fusion_id != 0) { + sourceList->append(0, loadDiskIndex(getFusionDir(fusion_id))); + } + for (size_t i = 0; i < spec.flush_ids.size(); ++i) { + const uint32_t id = spec.flush_ids[i]; + const uint32_t relative_id = id - fusion_id; + sourceList->append(relative_id, loadDiskIndex(getFlushDir(id))); + } + return sourceList; +} + +namespace { + + using LockGuard = std::lock_guard<std::mutex>; + +ISearchableIndexCollection::SP +getLeaf(const LockGuard &newSearchLock, const ISearchableIndexCollection::SP & is, bool warn=false) +{ + if (dynamic_cast<const WarmupIndexCollection *>(is.get()) != nullptr) { + if (warn) { + LOG(info, "Already warming up an index '%s'. Start using it immediately." + " This is an indication that you have configured your warmup interval too long.", + is->toString().c_str()); + } + const WarmupIndexCollection & wic(dynamic_cast<const WarmupIndexCollection &>(*is)); + return getLeaf(newSearchLock, wic.getNextIndexCollection(), warn); + } else { + return is; + } +} + +} + +/* + * Caller must hold _state_lock (SL). + */ +void +IndexMaintainer::replaceSource(uint32_t sourceId, const IndexSearchable::SP &source) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard lock(_new_search_lock); + ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock); + indexes->replace(sourceId, source); + swapInNewIndex(lock, std::move(indexes), *source); +} + +/* + * Caller must hold _state_lock (SL) and _new_search_lock (NSL), the latter + * passed as guard. + */ +void +IndexMaintainer::swapInNewIndex(LockGuard & guard, + ISearchableIndexCollection::SP indexes, + IndexSearchable & source) +{ + assert(indexes->valid()); + (void) guard; + if (_warmupConfig.getDuration() > vespalib::duration::zero()) { + if (dynamic_cast<const IDiskIndex *>(&source) != nullptr) { + LOG(debug, "Warming up a disk index."); + indexes = std::make_shared<WarmupIndexCollection> + (_warmupConfig, getLeaf(guard, _source_list, true), indexes, + static_cast<IDiskIndex &>(source), _ctx.getWarmupExecutor(), + _ctx.getThreadingService().clock(), *this); + } else { + LOG(debug, "No warmup needed as it is a memory index that is mapped in."); + } + } + LOG(debug, "Replacing indexcollection :\n%s\nwith\n%s", _source_list->toString().c_str(), indexes->toString().c_str()); + assert(indexes->valid()); + _source_list = std::move(indexes); +} + +/* + * Caller must hold _state_lock (SL). + */ +void +IndexMaintainer::appendSource(uint32_t sourceId, const IndexSearchable::SP &source) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard lock(_new_search_lock); + ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock); + indexes->append(sourceId, source); + swapInNewIndex(lock, std::move(indexes), *source); +} + +ISearchableIndexCollection::UP +IndexMaintainer::createNewSourceCollection(const LockGuard &newSearchLock) +{ + ISearchableIndexCollection::SP currentLeaf(getLeaf(newSearchLock, _source_list)); + return std::make_unique<IndexCollection>(_selector, *currentLeaf); +} + +IndexMaintainer::FlushArgs::FlushArgs() + : old_index(), + old_absolute_id(0), + old_source_list(), + save_info(), + flush_serial_num(), + stats(nullptr), + _skippedEmptyLast(false), + _extraIndexes(), + _changeGens(), + _prunedSchema() +{ +} +IndexMaintainer::FlushArgs::~FlushArgs() = default; +IndexMaintainer::FlushArgs::FlushArgs(FlushArgs &&) = default; +IndexMaintainer::FlushArgs & IndexMaintainer::FlushArgs::operator=(FlushArgs &&) = default; + +bool +IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index) +{ + // Called by initFlush via reconfigurer + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard state_lock(_state_lock); + args->old_index = _current_index; + args->old_absolute_id = _current_index_id + _last_fusion_id; + args->old_source_list = _source_list; + string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id)); + args->flush_serial_num = current_serial_num(); + { + LockGuard lock(_index_update_lock); + // Handover of extra memory indexes to flush + args->_extraIndexes = _frozenMemoryIndexes; + _frozenMemoryIndexes.clear(); + } + + LOG(debug, "Flushing. Id = %u. Serial num = %llu", + args->old_absolute_id, (unsigned long long) args->flush_serial_num); + { + LockGuard lock(_index_update_lock); + if (!_current_index->hasReceivedDocumentInsert() && + _source_selector_changes == 0 && + !_flush_empty_current_index) + { + args->_skippedEmptyLast = true; // Skip flush of empty memory index + } + + if (!args->_skippedEmptyLast) { + // Keep on using same source selector with extended valid range + args->save_info = getSourceSelector().extractSaveInfo(selector_name); + // XXX: Overflow issue in source selector + _current_index_id = getNewAbsoluteId() - _last_fusion_id; + assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + _selector->setDefaultSource(_current_index_id); + _source_selector_changes = 0; + } + _current_index = *new_index; + _flush_empty_current_index = false; + } + if (args->_skippedEmptyLast) { + replaceSource(_current_index_id, _current_index); + } else { + appendSource(_current_index_id, _current_index); + } + _source_list->setCurrentIndex(_current_index_id); + return true; +} + +void +IndexMaintainer::doFlush(FlushArgs args) +{ + // Called by a flush worker thread + FlushIds flushIds; // Absolute ids of flushed indexes + + flushFrozenMemoryIndexes(args, flushIds); + + if (!args._skippedEmptyLast) { + flushLastMemoryIndex(args, flushIds); + } + + assert(!flushIds.empty()); + if (args.stats != nullptr) { + updateFlushStats(args); + } + + scheduleFusion(flushIds); +} + +void +IndexMaintainer::flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds) +{ + // Called by a flush worker thread + for (FrozenMemoryIndexRef & frozen : args._extraIndexes) { + assert(frozen._absoluteId < args.old_absolute_id); + assert(flushIds.empty() || flushIds.back() < frozen._absoluteId); + + FlushArgs eArgs; + eArgs.old_index = frozen._index; + eArgs.flush_serial_num = frozen._serialNum; + eArgs.old_absolute_id = frozen._absoluteId; + const uint32_t docIdLimit = frozen._saveInfo->getHeader()._docIdLimit; + + flushMemoryIndex(eArgs, docIdLimit, *frozen._saveInfo, flushIds); + + // Drop references to old memory index and old save info. + frozen._index.reset(); + frozen._saveInfo.reset(); + } +} + +void +IndexMaintainer::flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds) +{ + // Called by a flush worker thread + const uint32_t docIdLimit = args.save_info->getHeader()._docIdLimit; + flushMemoryIndex(args, docIdLimit, *args.save_info, flushIds); +} + +void +IndexMaintainer::updateFlushStats(const FlushArgs &args) +{ + // Called by a flush worker thread + vespalib::string flushDir; + if (!args._skippedEmptyLast) { + flushDir = getFlushDir(args.old_absolute_id); + } else { + assert(!args._extraIndexes.empty()); + flushDir = getFlushDir(args._extraIndexes.back()._absoluteId); + } + args.stats->setPath(flushDir); +} + +void +IndexMaintainer::flushMemoryIndex(FlushArgs &args, + uint32_t docIdLimit, + FixedSourceSelector::SaveInfo &saveInfo, + FlushIds &flushIds) +{ + // Called by a flush worker thread + ChangeGens changeGens = getChangeGens(); + IMemoryIndex &memoryIndex = *args.old_index; + Schema::SP prunedSchema = memoryIndex.getPrunedSchema(); + IDiskIndex::SP diskIndex = flushMemoryIndex(memoryIndex, args.old_absolute_id, + docIdLimit, args.flush_serial_num, + saveInfo); + // Post processing after memory index has been written to disk and + // opened as disk index. + args._changeGens = changeGens; + args._prunedSchema = prunedSchema; + reconfigureAfterFlush(args, diskIndex); + + flushIds.push_back(args.old_absolute_id); +} + + +void +IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex) +{ + // Called by a flush worker thread + for (;;) { + // Call reconfig closure for this change + auto configure = makeLambdaConfigure([this, argsP=&args, diskIndexP=&diskIndex]() { + return doneFlush(argsP, diskIndexP); + }); + if (reconfigure(std::move(configure))) { + return; + } + ChangeGens changeGens = getChangeGens(); + Schema::SP prunedSchema = args.old_index->getPrunedSchema(); + const string indexDir = getFlushDir(args.old_absolute_id); + if (prunedSchema) { + updateDiskIndexSchema(indexDir, *prunedSchema, noSerialNumHigh); + } + IDiskIndex::SP reloadedDiskIndex = reloadDiskIndex(*diskIndex); + diskIndex = reloadedDiskIndex; + args._changeGens = changeGens; + args._prunedSchema = prunedSchema; + } +} + + +bool +IndexMaintainer::doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index) { + // Called by doFlush via reconfigurer + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard state_lock(_state_lock); + IMemoryIndex &memoryIndex = *args->old_index; + if (args->_changeGens != getChangeGens()) { + return false; // Must retry operation + } + if (args->_prunedSchema != memoryIndex.getPrunedSchema()) { + return false; // Must retry operation + } + set_flush_serial_num(std::max(flush_serial_num(), args->flush_serial_num)); + vespalib::system_time timeStamp = search::FileKit::getModificationTime((*disk_index)->getIndexDir()); + _lastFlushTime = timeStamp > _lastFlushTime ? timeStamp : _lastFlushTime; + const uint32_t old_id = args->old_absolute_id - _last_fusion_id; + replaceSource(old_id, *disk_index); + return true; +} + +void +IndexMaintainer::scheduleFusion(const FlushIds &flushIds) +{ + // Called by a flush worker thread + LOG(debug, "Scheduled fusion for id %u.", flushIds.back()); + LockGuard guard(_fusion_lock); + for (uint32_t id : flushIds) { + _fusion_spec.flush_ids.push_back(id); + } +} + +bool +IndexMaintainer::canRunFusion(const FusionSpec &spec) const +{ + return spec.flush_ids.size() > 1 || + (spec.flush_ids.size() > 0 && spec.last_fusion_id != 0); +} + +bool +IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index) +{ + // Called by runFusion via reconfigurer + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard state_lock(_state_lock); + if (args->_changeGens != getChangeGens()) { + return false; // Must retry operation + } + if (args->_prunedSchema != getActiveFusionPrunedSchema()) { + return false; // Must retry operation + } + args->_old_source_list = _source_list; // delays destruction + uint32_t id_diff = args->_new_fusion_id - _last_fusion_id; + ostringstream ost; + ost << "sourceselector_fusion(" << args->_new_fusion_id << ")"; + { + LockGuard lock(_index_update_lock); + + // make new source selector with shifted values. + _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff); + _source_selector_changes = 0; + _current_index_id -= id_diff; + _last_fusion_id = args->_new_fusion_id; + _selector->setBaseId(_last_fusion_id); + _activeFusionSchema.reset(); + _activeFusionPrunedSchema.reset(); + } + + ISearchableIndexCollection::SP currentLeaf; + { + LockGuard lock(_new_search_lock); + currentLeaf = getLeaf(lock, _source_list); + } + ISearchableIndexCollection::UP fsc = + IndexCollection::replaceAndRenumber(_selector, *currentLeaf, id_diff, *new_index); + fsc->setCurrentIndex(_current_index_id); + + { + LockGuard lock(_new_search_lock); + swapInNewIndex(lock, std::move(fsc), **new_index); + } + return true; +} + +bool +IndexMaintainer::makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCollection> keepAlive) +{ + // called by warmupDone via reconfigurer, warmupDone() doesn't wait for us + assert(_ctx.getThreadingService().master().isCurrentThread()); + ISearchableIndexCollection::SP warmIndex; + { + LockGuard state_lock(_state_lock); + if (keepAlive == _source_list) { + LockGuard lock(_new_search_lock); + warmIndex = (getLeaf(lock, _source_list, false)); + _source_list = warmIndex; + } + } + if (warmIndex) { + LOG(info, "New index warmed up and switched in : %s", warmIndex->toString().c_str()); + } + LOG(info, "Sync warmupExecutor."); + keepAlive->drainPending(); + LOG(info, "Now the keep alive of the warmupindexcollection should be gone."); + return true; +} + +void +IndexMaintainer::warmupDone(std::shared_ptr<WarmupIndexCollection> current) +{ + // Called by a search thread + LockGuard lock(_new_search_lock); + if (current == _source_list) { + auto makeSure = makeLambdaConfigure([this, collection=std::move(current)]() { + return makeSureAllRemainingWarmupIsDone(std::move(collection)); + }); + auto task = std::make_unique<ReconfigRunnableTask>(_ctx.getReconfigurer(), std::move(makeSure)); + _ctx.getThreadingService().master().execute(std::move(task)); + } else { + LOG(warning, "There has arrived a new IndexCollection while replacing the active index. " + "It can theoretically happen, but not very likely, so logging this as a warning."); + } +} + +namespace { + +bool +has_matching_interleaved_features(const Schema& old_schema, const Schema& new_schema) +{ + for (SchemaUtil::IndexIterator itr(new_schema); itr.isValid(); ++itr) { + if (itr.hasMatchingOldFields(old_schema) && + !itr.has_matching_use_interleaved_features(old_schema)) + { + return false; + } + } + return true; +} + +} + + +void +IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor + LockGuard state_lock(_state_lock); + typedef FixedSourceSelector::SaveInfo SaveInfo; + args._oldSchema = _schema; // Delay destruction + args._oldIndex = _current_index; // Delay destruction + args._oldSourceList = _source_list; // Delay destruction + uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id; + string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId)); + SerialNum freezeSerialNum = current_serial_num(); + bool dropEmptyLast = false; + SaveInfo::UP saveInfo; + + LOG(info, "Making new schema. Id = %u. Serial num = %llu", oldAbsoluteId, (unsigned long long) freezeSerialNum); + { + LockGuard lock(_index_update_lock); + _schema = args._newSchema; + if (!_current_index->hasReceivedDocumentInsert()) { + dropEmptyLast = true; // Skip flush of empty memory index + } + + if (!dropEmptyLast) { + // Keep on using same source selector with extended valid range + saveInfo = getSourceSelector().extractSaveInfo(selectorName); + // XXX: Overflow issue in source selector + _current_index_id = getNewAbsoluteId() - _last_fusion_id; + assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + _selector->setDefaultSource(_current_index_id); + // Extra index to flush next time flushing is performed + _frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId); + } + _current_index = newIndex; + // Non-matching interleaved features in schemas means that we need to + // reconstruct or drop interleaved features in posting lists. + // If so, we must flush the new index to disk even if it is empty. + // This ensures that 2x triggerFlush will run fusion + // to reconstruct or drop interleaved features in the posting lists. + _flush_empty_current_index = !has_matching_interleaved_features(args._oldSchema, args._newSchema); + } + if (dropEmptyLast) { + replaceSource(_current_index_id, _current_index); + } else { + appendSource(_current_index_id, _current_index); + } + _source_list->setCurrentIndex(_current_index_id); +} + + +Schema +IndexMaintainer::getSchema(void) const +{ + LockGuard lock(_index_update_lock); + return _schema; +} + +Schema::SP +IndexMaintainer::getActiveFusionPrunedSchema(void) const +{ + LockGuard lock(_index_update_lock); + return _activeFusionPrunedSchema; +} + +TuneFileAttributes +IndexMaintainer::getAttrTune(void) +{ + return _tuneFileAttributes; +} + +IndexMaintainer::ChangeGens +IndexMaintainer::getChangeGens(void) +{ + LockGuard lock(_index_update_lock); + return _changeGens; +} + +bool +IndexMaintainer::reconfigure(std::unique_ptr<Configure> configure) +{ + // Called by a flush engine worker thread + bool result = false; + ReconfigRunnable runnable(result, _ctx.getReconfigurer(), std::move(configure)); + _ctx.getThreadingService().master().run(runnable); + return result; +} + +IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, + const IndexMaintainerContext &ctx, + IIndexMaintainerOperations &operations) + : _base_dir(config.getBaseDir()), + _warmupConfig(config.getWarmup()), + _disk_indexes(std::make_shared<DiskIndexes>()), + _layout(config.getBaseDir()), + _schema(config.getSchema()), + _activeFusionSchema(), + _activeFusionPrunedSchema(), + _source_selector_changes(0), + _selector(), + _source_list(), + _last_fusion_id(), + _next_id(), + _current_index_id(), + _current_index(), + _flush_empty_current_index(false), + _current_serial_num(0), + _flush_serial_num(0), + _lastFlushTime(), + _frozenMemoryIndexes(), + _state_lock(), + _index_update_lock(), + _new_search_lock(), + _remove_lock(), + _fusion_spec(), + _fusion_lock(), + _maxFlushed(config.getMaxFlushed()), + _maxFrozen(10), + _changeGens(), + _schemaUpdateLock(), + _tuneFileAttributes(config.getTuneFileAttributes()), + _ctx(ctx), + _operations(operations) +{ + // Called by document db init executor thread + _changeGens.bumpPruneGen(); + DiskIndexCleaner::clean(_base_dir, *_disk_indexes); + FusionSpec spec = IndexReadUtilities::readFusionSpec(_base_dir); + _next_id = 1 + (spec.flush_ids.empty() ? spec.last_fusion_id : spec.flush_ids.back()); + _last_fusion_id = spec.last_fusion_id; + + if (_next_id > 1) { + string latest_index_dir = spec.flush_ids.empty() + ? getFusionDir(_next_id - 1) + : getFlushDir(_next_id - 1); + + set_flush_serial_num(IndexReadUtilities::readSerialNum(latest_index_dir)); + _lastFlushTime = search::FileKit::getModificationTime(latest_index_dir); + set_current_serial_num(flush_serial_num()); + const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir); + _selector = FixedSourceSelector::load(selector, _next_id - 1); + } else { + set_flush_serial_num(0); + _selector = std::make_shared<FixedSourceSelector>(0, "sourceselector", 1); + } + uint32_t baseId(_selector->getBaseId()); + if (_last_fusion_id != baseId) { + assert(_last_fusion_id > baseId); + uint32_t id_diff = _last_fusion_id - baseId; + ostringstream ost; + ost << "sourceselector_fusion(" << _last_fusion_id << ")"; + _selector = getSourceSelector().cloneAndSubtract(ost.str(), id_diff); + assert(_last_fusion_id == _selector->getBaseId()); + } + _current_index_id = getNewAbsoluteId() - _last_fusion_id; + assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + _selector->setDefaultSource(_current_index_id); + auto sourceList = loadDiskIndexes(spec, std::make_unique<IndexCollection>(_selector)); + _current_index = operations.createMemoryIndex(_schema, *sourceList, current_serial_num()); + LOG(debug, "Index manager created with flushed serial num %" PRIu64, flush_serial_num()); + sourceList->append(_current_index_id, _current_index); + sourceList->setCurrentIndex(_current_index_id); + _source_list = std::move(sourceList); + _fusion_spec = spec; + _ctx.getThreadingService().master().execute(makeLambdaTask([this,&config]() { + pruneRemovedFields(_schema, config.getSerialNum()); + })); + _ctx.getThreadingService().master().sync(); +} + +IndexMaintainer::~IndexMaintainer() +{ + _source_list.reset(); + _frozenMemoryIndexes.clear(); + _selector.reset(); +} + +FlushTask::UP +IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stats) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits + { + LockGuard lock(_index_update_lock); + set_current_serial_num(std::max(current_serial_num(), serialNum)); + } + + IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema(), *_current_index, current_serial_num())); + FlushArgs args; + args.stats = stats; + // Ensure that all index thread tasks accessing memory index have completed. + commit_and_wait(); + // Call reconfig closure for this change + auto configure = makeLambdaConfigure([this, argsP=&args, indexP=&new_index]() { + return doneInitFlush(argsP, indexP); + }); + bool success = _ctx.getReconfigurer().reconfigure(std::move(configure)); + assert(success); + (void) success; + if (args._skippedEmptyLast && args._extraIndexes.empty()) { + // No memory index to flush, it was empty + LockGuard lock(_state_lock); + set_flush_serial_num(current_serial_num()); + _lastFlushTime = vespalib::system_clock::now(); + LOG(debug, "No memory index to flush. Update serial number and flush time to current: " + "flushSerialNum(%" PRIu64 "), lastFlushTime(%f)", + flush_serial_num(), vespalib::to_s(_lastFlushTime.time_since_epoch())); + return FlushTask::UP(); + } + SerialNum realSerialNum = args.flush_serial_num; + return makeLambdaFlushTask([this, myargs=std::move(args)]() mutable { doFlush(std::move(myargs)); }, realSerialNum); +} + +FusionSpec +IndexMaintainer::getFusionSpec() +{ + // Only called by unit test + LockGuard guard(_fusion_lock); + return _fusion_spec; +} + +string +IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token) +{ + // Called by a flush engine worker thread + + // Make sure to update serial num in case it is something that does not receive any data. + // XXX: Wrong, and will cause data loss. + // XXX: Missing locking. + // XXX: Claims to have flushed memory index when starting fusion. + { + LockGuard lock(_index_update_lock); + set_current_serial_num(std::max(current_serial_num(), serialNum)); + } + + FusionSpec spec; + { + LockGuard guard(_fusion_lock); + if (!canRunFusion(_fusion_spec)) + return ""; + spec = _fusion_spec; + _fusion_spec.flush_ids.clear(); + } + + uint32_t new_fusion_id = runFusion(spec, flush_token); + + LockGuard lock(_fusion_lock); + if (new_fusion_id == spec.last_fusion_id) { // Error running fusion. + string fail_dir = getFusionDir(spec.flush_ids.back()); + if (flush_token->stop_requested()) { + LOG(info, "Fusion stopped for id %u, fusion dir \"%s\".", spec.flush_ids.back(), fail_dir.c_str()); + } else { + LOG(warning, "Fusion failed for id %u, fusion dir \"%s\".", spec.flush_ids.back(), fail_dir.c_str()); + } + // Restore fusion spec. + copy(_fusion_spec.flush_ids.begin(), _fusion_spec.flush_ids.end(), back_inserter(spec.flush_ids)); + _fusion_spec.flush_ids.swap(spec.flush_ids); + } else { + _fusion_spec.last_fusion_id = new_fusion_id; + } + return getFusionDir(new_fusion_id); +} + +namespace { + +class RemoveFusionIndexGuard { + DiskIndexes* _disk_indexes; + IndexDiskDir _index_disk_dir; +public: + RemoveFusionIndexGuard(DiskIndexes& disk_indexes, IndexDiskDir index_disk_dir) + : _disk_indexes(&disk_indexes), + _index_disk_dir(index_disk_dir) + { + _disk_indexes->add_not_active(index_disk_dir); + } + ~RemoveFusionIndexGuard() { + if (_disk_indexes != nullptr) { + (void) _disk_indexes->remove(_index_disk_dir); + } + } + void reset() { _disk_indexes = nullptr; } +}; + +} + +uint32_t +IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token) +{ + // Called by a flush engine worker thread + FusionArgs args; + TuneFileAttributes tuneFileAttributes(getAttrTune()); + { + LockGuard slock(_state_lock); + LockGuard ilock(_index_update_lock); + _activeFusionSchema = std::make_shared<Schema>(_schema); + _activeFusionPrunedSchema.reset(); + args._schema = _schema; + } + FastOS_StatInfo statInfo; + string lastFlushDir(getFlushDir(fusion_spec.flush_ids.back())); + string lastSerialFile = IndexDiskLayout::getSerialNumFileName(lastFlushDir); + SerialNum serialNum = 0; + if (FastOS_File::Stat(lastSerialFile.c_str(), &statInfo)) { + serialNum = IndexReadUtilities::readSerialNum(lastFlushDir); + } + IndexDiskDir fusion_index_disk_dir(fusion_spec.flush_ids.back(), true); + RemoveFusionIndexGuard remove_fusion_index_guard(*_disk_indexes, fusion_index_disk_dir); + FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext()); + uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, flush_token); + bool ok = (new_fusion_id != 0); + if (ok) { + ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()), + getFusionDir(new_fusion_id)); + } + if (!ok) { + string fail_dir = getFusionDir(fusion_spec.flush_ids.back()); + if (flush_token->stop_requested()) { + LOG(info, "Fusion stopped, fusion dir \"%s\".", fail_dir.c_str()); + } else { + LOG(error, "Fusion failed, fusion dir \"%s\".", fail_dir.c_str()); + } + FastOS_FileInterface::EmptyAndRemoveDirectory(fail_dir.c_str()); + { + LockGuard slock(_state_lock); + LockGuard ilock(_index_update_lock); + _activeFusionSchema.reset(); + _activeFusionPrunedSchema.reset(); + } + vespalib::File::sync(vespalib::dirname(fail_dir)); + return fusion_spec.last_fusion_id; + } + + const string new_fusion_dir = getFusionDir(new_fusion_id); + Schema::SP prunedSchema = getActiveFusionPrunedSchema(); + if (prunedSchema) { + updateDiskIndexSchema(new_fusion_dir, *prunedSchema, noSerialNumHigh); + } + ChangeGens changeGens = getChangeGens(); + IDiskIndex::SP new_index(loadDiskIndex(new_fusion_dir)); + remove_fusion_index_guard.reset(); + + // Post processing after fusion operation has completed and new disk + // index has been opened. + + args._new_fusion_id = new_fusion_id; + args._changeGens = changeGens; + args._prunedSchema = prunedSchema; + for (;;) { + // Call reconfig closure for this change + bool success = reconfigure(makeLambdaConfigure([this,argsP=&args,indexP=&new_index]() { + return doneFusion(argsP, indexP); + })); + if (success) { + break; + } + changeGens = getChangeGens(); + prunedSchema = getActiveFusionPrunedSchema(); + if (prunedSchema) { + updateDiskIndexSchema(new_fusion_dir, *prunedSchema, noSerialNumHigh); + } + IDiskIndex::SP diskIndex2; + diskIndex2 = reloadDiskIndex(*new_index); + new_index = diskIndex2; + args._changeGens = changeGens; + args._prunedSchema = prunedSchema; + } + removeOldDiskIndexes(); + + return new_fusion_id; +} + +void +IndexMaintainer::removeOldDiskIndexes() +{ + LockGuard slock(_remove_lock); + DiskIndexCleaner::removeOldIndexes(_base_dir, *_disk_indexes); +} + +IndexMaintainer::FlushStats +IndexMaintainer::getFlushStats() const +{ + // Called by flush engine scheduler thread (from getFlushTargets()) + FlushStats stats; + uint64_t source_selector_bytes; + uint32_t source_selector_changes; + uint32_t numFrozen = 0; + { + LockGuard lock(_index_update_lock); + source_selector_bytes = _selector->getDocIdLimit() * sizeof(Source); + stats.memory_before_bytes += _current_index->getMemoryUsage().allocatedBytes() + source_selector_bytes; + stats.memory_after_bytes += _current_index->getStaticMemoryFootprint() + source_selector_bytes; + numFrozen = _frozenMemoryIndexes.size(); + for (const FrozenMemoryIndexRef & frozen : _frozenMemoryIndexes) { + stats.memory_before_bytes += frozen._index->getMemoryUsage().allocatedBytes() + source_selector_bytes; + } + source_selector_changes = _source_selector_changes; + } + + if (!source_selector_changes && stats.memory_after_bytes >= stats.memory_before_bytes) { + // Nothing is written if the index is empty. + stats.disk_write_bytes = 0; + stats.cpu_time_required = 0; + } else { + stats.disk_write_bytes = stats.memory_before_bytes + source_selector_bytes - stats.memory_after_bytes; + stats.cpu_time_required = source_selector_bytes * 3 * (1 + numFrozen) + stats.disk_write_bytes; + } + return stats; +} + +IndexMaintainer::FusionStats +IndexMaintainer::getFusionStats() const +{ + // Called by flush engine scheduler thread (from getFlushTargets()) + FusionStats stats; + IndexSearchable::SP source_list; + + { + LockGuard lock(_new_search_lock); + source_list = _source_list; + stats.maxFlushed = _maxFlushed; + } + stats.diskUsage = source_list->getSearchableStats().sizeOnDisk(); + { + LockGuard guard(_fusion_lock); + stats.numUnfused = _fusion_spec.flush_ids.size() + ((_fusion_spec.last_fusion_id != 0) ? 1 : 0); + stats._canRunFusion = canRunFusion(_fusion_spec); + } + LOG(debug, "Get fusion stats. Disk usage: %" PRIu64 ", maxflushed: %d", stats.diskUsage, stats.maxFlushed); + return stats; +} + +uint32_t +IndexMaintainer::getNumFrozenMemoryIndexes(void) const +{ + // Called by flush engine scheduler thread (from getFlushTargets()) + LockGuard state_lock(_index_update_lock); + return _frozenMemoryIndexes.size(); +} + +void +IndexMaintainer::putDocument(uint32_t lid, const Document &doc, SerialNum serialNum, OnWriteDoneType on_write_done) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + try { + _current_index->insertDocument(lid, doc, on_write_done); + } catch (const vespalib::IllegalStateException & e) { + vespalib::string s = "Failed inserting document :\n" + doc.toXml(" ") + "\n"; + LOG(error, "%s", s.c_str()); + throw vespalib::IllegalStateException(s, e, VESPA_STRLOC); + } + _selector->setSource(lid, _current_index_id); + _source_list->setSource(lid); + ++_source_selector_changes; + set_current_serial_num(serialNum); +} + +void +IndexMaintainer::removeDocuments(LidVector lids, SerialNum serialNum) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + for (uint32_t lid : lids) { + _selector->setSource(lid, _current_index_id); + _source_list->setSource(lid); + } + _source_selector_changes += lids.size(); + set_current_serial_num(serialNum); + _current_index->removeDocuments(std::move(lids)); +} + +void +IndexMaintainer::commit_and_wait() +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + vespalib::Gate gate; + _ctx.getThreadingService().index().execute(makeLambdaTask([this, &gate]() { commit(gate); })); + // Ensure that all index thread tasks accessing memory index have completed. + gate.await(); +} + +void +IndexMaintainer::commit(vespalib::Gate& gate) +{ + // only triggered via commit_and_wait() + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + _current_index->commit(std::make_shared<vespalib::GateCallback>(gate), current_serial_num()); +} + +void +IndexMaintainer::commit(SerialNum serialNum, OnWriteDoneType onWriteDone) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + set_current_serial_num(serialNum); + _current_index->commit(onWriteDone, serialNum); +} + +void +IndexMaintainer::heartBeat(SerialNum serialNum) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + set_current_serial_num(serialNum); +} + +void +IndexMaintainer::compactLidSpace(uint32_t lidLimit, SerialNum serialNum) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LOG(info, "compactLidSpace(%u, %" PRIu64 ")", lidLimit, serialNum); + LockGuard lock(_index_update_lock); + set_current_serial_num(serialNum); + _selector->compactLidSpace(lidLimit); +} + +IFlushTarget::List +IndexMaintainer::getFlushTargets() +{ + // Called by flush engine scheduler thread + IFlushTarget::List ret; + ret.reserve(2); + ret.push_back(std::make_shared<IndexFlushTarget>(*this)); + ret.push_back(std::make_shared<IndexFusionTarget>(*this)); + return ret; +} + +void +IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + pruneRemovedFields(schema, serialNum); + IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, *_current_index, current_serial_num())); + SetSchemaArgs args; + + args._newSchema = schema; + // Ensure that all index thread tasks accessing memory index have completed. + commit_and_wait(); + // Everything should be quiet now. + doneSetSchema(args, new_index); + // Source collection has now changed, caller must reconfigure further + // as appropriate. +} + +void +IndexMaintainer::pruneRemovedFields(const Schema &schema, SerialNum serialNum) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + ISearchableIndexCollection::SP new_source_list; + IIndexCollection::SP coll = getSourceCollection(); + updateIndexSchemas(*coll, schema, serialNum); + updateActiveFusionPrunedSchema(schema); + { + LockGuard state_lock(_state_lock); + LockGuard lock(_index_update_lock); + _changeGens.bumpPruneGen(); + } + { + LockGuard state_lock(_state_lock); + new_source_list = std::make_shared<IndexCollection>(_selector, *_source_list); + } + if (reopenDiskIndexes(*new_source_list)) { + commit_and_wait(); + // Everything should be quiet now. + LockGuard state_lock(_state_lock); + LockGuard lock(_new_search_lock); + _source_list = new_source_list; + } +} + +void +IndexMaintainer::setMaxFlushed(uint32_t maxFlushed) +{ + LockGuard lock(_new_search_lock); + _maxFlushed = maxFlushed; +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h new file mode 100644 index 00000000000..b3fb14e1c2e --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainer.h @@ -0,0 +1,376 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "iindexmanager.h" +#include "disk_indexes.h" +#include "fusionspec.h" +#include "idiskindex.h" +#include "iindexmaintaineroperations.h" +#include "indexdisklayout.h" +#include "indexmaintainerconfig.h" +#include "indexmaintainercontext.h" +#include "imemoryindex.h" +#include "warmupindexcollection.h" +#include "ithreadingservice.h" +#include "indexsearchable.h" +#include "indexcollection.h" +#include <vespa/searchcorespi/flush/iflushtarget.h> +#include <vespa/searchcorespi/flush/flushstats.h> +#include <vespa/searchlib/attribute/fixedsourceselector.h> +#include <vespa/searchlib/common/serialnum.h> +#include <atomic> + +namespace document { class Document; } + +namespace search::common { class FileHeaderContext; } +namespace vespalib { class Gate; } + +namespace searchcorespi::index { + +/** + * The IndexMaintainer provides a holistic view of a set of disk and + * memory indexes. It allows updating the active memory index, enables search + * across all indexes, and manages the set of indexes through flushing + * of memory indexes and fusion of disk indexes. + */ +class IndexMaintainer : public IIndexManager, + public IWarmupDone { + /** + * Extra memory that is frozen but not yet flushed. + */ + class FrozenMemoryIndexRef { + public: + typedef search::FixedSourceSelector::SaveInfo SaveInfo; + typedef search::SerialNum SerialNum; + typedef std::shared_ptr<SaveInfo> SaveInfoSP; + + IMemoryIndex::SP _index; + SerialNum _serialNum; + SaveInfoSP _saveInfo; + uint32_t _absoluteId; + + FrozenMemoryIndexRef(const IMemoryIndex::SP &index, + SerialNum serialNum, + SaveInfo::UP saveInfo, + uint32_t absoluteId) + : _index(index), + _serialNum(serialNum), + _saveInfo(saveInfo.release()), + _absoluteId(absoluteId) + { } + ~FrozenMemoryIndexRef(); + }; + + class ChangeGens { + public: + uint32_t _pruneGen; + + ChangeGens() : _pruneGen(0) { } + void bumpPruneGen(void) { ++_pruneGen; } + bool operator==(const ChangeGens &rhs) const { return _pruneGen == rhs._pruneGen; } + bool operator!=(const ChangeGens &rhs) const { return _pruneGen != rhs._pruneGen; } + }; + + using FlushIds = std::vector<uint32_t>; + using FrozenMemoryIndexRefs = std::vector<FrozenMemoryIndexRef>; + using ISourceSelector = search::queryeval::ISourceSelector; + using LockGuard = std::lock_guard<std::mutex>; + + const vespalib::string _base_dir; + const WarmupConfig _warmupConfig; + DiskIndexes::SP _disk_indexes; + IndexDiskLayout _layout; + Schema _schema; // Protected by SL + IUL + Schema::SP _activeFusionSchema; // Protected by SL + IUL + // Protected by SL + IUL + Schema::SP _activeFusionPrunedSchema; + uint32_t _source_selector_changes; // Protected by IUL + // _selector is protected by SL + IUL + ISourceSelector::SP _selector; + ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread + uint32_t _last_fusion_id; // Protected by SL + IUL + uint32_t _next_id; // Protected by SL + IUL + uint32_t _current_index_id; // Protected by SL + IUL + IMemoryIndex::SP _current_index; // Protected by SL + IUL + bool _flush_empty_current_index; + std::atomic<SerialNum> _current_serial_num;// Writes protected by IUL + std::atomic<SerialNum> _flush_serial_num; // Writes protected by SL + vespalib::system_time _lastFlushTime; // Protected by SL + // Extra frozen memory indexes. This list is empty unless new + // memory index has been added by force (due to config change or + // data structure limitations). + // Protected by SL + IUL + FrozenMemoryIndexRefs _frozenMemoryIndexes; + /* + * Locks protecting state. + * + * Note about locking: + * + * A variable can be protected by multiple locks, e.g. SL + NSL. + * To change the variable, all of these locks must be held. + * To read the variable, holding any of these locks is sufficient. + * + * In the example above, getting NSL typically has lower latency, but + * fewer variables can be retrieved. Getting SL has a higher latency + * and allows a snapshot of multiple variables depending on each other + * to be retrieved. + * + * Flush threads typically performs some setup (take SL, copy a + * relevant portion of variables to an args class (FlushArgs, + * FusionArgs, SetSchemaArgs) and perform some disk io) that takes + * time before scheduling a state change task performed by the + * document db master thread. + * + * The scheduled state change task will fail if state changed too much + * after setup by the flush thread. The flush thread must then retry + * with an updated setup. + * + * Things get more complicated when handling multiple kinds of overlapping + * flush operations, e.g. dump from memory to disk, fusion, schema changes + * and pruning of removed fields, since this will trigger more retries for + * some of the operations. + */ + std::mutex _state_lock; // Outer lock (SL) + mutable std::mutex _index_update_lock; // Inner lock (IUL) + mutable std::mutex _new_search_lock; // Inner lock (NSL) + std::mutex _remove_lock; // Lock for removing indexes. + // Protected by SL + IUL + FusionSpec _fusion_spec; // Protected by FL + mutable std::mutex _fusion_lock; // Fusion spec lock (FL) + uint32_t _maxFlushed; + uint32_t _maxFrozen; + ChangeGens _changeGens; // Protected by SL + IUL + std::mutex _schemaUpdateLock; // Serialize rewrite of schema + const search::TuneFileAttributes _tuneFileAttributes; + const IndexMaintainerContext _ctx; + IIndexMaintainerOperations &_operations; + + search::FixedSourceSelector & getSourceSelector() { return static_cast<search::FixedSourceSelector &>(*_selector); } + const search::FixedSourceSelector & getSourceSelector() const { return static_cast<const search::FixedSourceSelector &>(*_selector); } + uint32_t getNewAbsoluteId(); + vespalib::string getFlushDir(uint32_t sourceId) const; + vespalib::string getFusionDir(uint32_t sourceId) const; + + /** + * Will reopen diskindexes if necessary due to schema changes. + * @param coll Indexcollection that will be updated with reloaded index. + * @return true if any reload has been performed + */ + bool reopenDiskIndexes(ISearchableIndexCollection &coll); + + void updateDiskIndexSchema(const vespalib::string &indexDir, + const Schema &schema, + SerialNum serialNum); + + void updateIndexSchemas(IIndexCollection &coll, + const Schema &schema, + SerialNum serialNum); + + void updateActiveFusionPrunedSchema(const Schema &schema); + void deactivateDiskIndexes(vespalib::string indexDir); + IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir); + IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex); + + IDiskIndex::SP flushMemoryIndex(IMemoryIndex &memoryIndex, + uint32_t indexId, + uint32_t docIdLimit, + SerialNum serialNum, + search::FixedSourceSelector::SaveInfo &saveInfo); + + ISearchableIndexCollection::UP loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList); + void replaceSource(uint32_t sourceId, const IndexSearchable::SP &source); + void appendSource(uint32_t sourceId, const IndexSearchable::SP &source); + void swapInNewIndex(LockGuard & guard, ISearchableIndexCollection::SP indexes, IndexSearchable & source); + ISearchableIndexCollection::UP createNewSourceCollection(const LockGuard &newSearchLock); + + struct FlushArgs { + IMemoryIndex::SP old_index; // Last memory index + uint32_t old_absolute_id; + ISearchableIndexCollection::SP old_source_list; // Delays destruction + search::FixedSourceSelector::SaveInfo::SP save_info; + SerialNum flush_serial_num; + FlushStats * stats; + bool _skippedEmptyLast; // Don't flush empty memory index + + // Extra indexes to flush before flushing last frozen memory index + // They are flushed before old_index. This list is empty unless + // new memory index has been added by force (due to config change + // or data structure limitations). + FrozenMemoryIndexRefs _extraIndexes; + ChangeGens _changeGens; + Schema::SP _prunedSchema; + + FlushArgs(); + FlushArgs(const FlushArgs &) = delete; + FlushArgs & operator=(const FlushArgs &) = delete; + FlushArgs(FlushArgs &&); + FlushArgs & operator=(FlushArgs &&); + ~FlushArgs(); + }; + + bool doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index); + void doFlush(FlushArgs args); + void flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds); + void flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds); + void updateFlushStats(const FlushArgs &args); + void flushMemoryIndex(FlushArgs &args, uint32_t docIdLimit, + search::FixedSourceSelector::SaveInfo &saveInfo, FlushIds &flushIds); + void reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex); + bool doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index); + + + class FusionArgs { + public: + uint32_t _new_fusion_id; + ChangeGens _changeGens; + Schema _schema; + Schema::SP _prunedSchema; + ISearchableIndexCollection::SP _old_source_list; // Delays destruction + + FusionArgs(); + ~FusionArgs(); + }; + + void scheduleFusion(const FlushIds &flushIds); + bool canRunFusion(const FusionSpec &spec) const; + bool doneFusion(FusionArgs *args, IDiskIndex::SP *new_index); + + class SetSchemaArgs { + public: + Schema _newSchema; + Schema _oldSchema; + IMemoryIndex::SP _oldIndex; + ISearchableIndexCollection::SP _oldSourceList; // Delays destruction + + SetSchemaArgs(); + ~SetSchemaArgs(); + }; + + void doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex); + + Schema getSchema(void) const; + Schema::SP getActiveFusionPrunedSchema() const; + search::TuneFileAttributes getAttrTune(); + ChangeGens getChangeGens(); + + /* + * Schedule document db executor task to use reconfigurer to + * reconfigure index manager with closure as argument. Wait for + * result. + */ + bool reconfigure(std::unique_ptr<Configure> configure); + void warmupDone(std::shared_ptr<WarmupIndexCollection> current) override; + bool makeSureAllRemainingWarmupIsDone(std::shared_ptr<WarmupIndexCollection> keepAlive); + void commit_and_wait(); + void commit(vespalib::Gate& gate); + void pruneRemovedFields(const Schema &schema, SerialNum serialNum); + [[nodiscard]] SerialNum current_serial_num() const noexcept { + return _current_serial_num.load(std::memory_order_relaxed); + } + void set_current_serial_num(SerialNum new_serial_num) noexcept { + _current_serial_num.store(new_serial_num, std::memory_order_relaxed); + } + [[nodiscard]] SerialNum flush_serial_num() const noexcept { + return _flush_serial_num.load(std::memory_order_relaxed); + } + void set_flush_serial_num(SerialNum new_serial_num) noexcept { + _flush_serial_num.store(new_serial_num, std::memory_order_relaxed); + } + +public: + IndexMaintainer(const IndexMaintainer &) = delete; + IndexMaintainer & operator = (const IndexMaintainer &) = delete; + IndexMaintainer(const IndexMaintainerConfig &config, + const IndexMaintainerContext &context, + IIndexMaintainerOperations &operations); + ~IndexMaintainer() override; + + /** + * Starts a new MemoryIndex, and dumps the previous one to disk. + * Updates flush stats when finished if specified. + **/ + FlushTask::UP initFlush(SerialNum serialNum, FlushStats * stats); + FusionSpec getFusionSpec(); + + /** + * Runs fusion for any available specs and return the output fusion directory. + */ + vespalib::string doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushToken> flush_token); + uint32_t runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token); + void removeOldDiskIndexes(); + + struct FlushStats { + explicit FlushStats(uint64_t memory_before=0) : + memory_before_bytes(memory_before), + memory_after_bytes(0), + disk_write_bytes(0), + cpu_time_required(0) + { } + + uint64_t memory_before_bytes; + uint64_t memory_after_bytes; + uint64_t disk_write_bytes; + uint64_t cpu_time_required; + }; + + struct FusionStats { + FusionStats() + : diskUsage(0), + maxFlushed(0), + numUnfused(0), + _canRunFusion(false) + { } + + uint64_t diskUsage; + uint32_t maxFlushed; + uint32_t numUnfused; + bool _canRunFusion; + }; + + /** + * Calculates an approximation of the cost of performing a flush. + **/ + FlushStats getFlushStats() const; + FusionStats getFusionStats() const; + const vespalib::string & getBaseDir() const { return _base_dir; } + uint32_t getNumFrozenMemoryIndexes() const; + uint32_t getMaxFrozenMemoryIndexes() const { return _maxFrozen; } + + vespalib::system_time getLastFlushTime() const { return _lastFlushTime; } + + // Implements IIndexManager + void putDocument(uint32_t lid, const Document &doc, SerialNum serialNum, OnWriteDoneType on_write_done) override; + void removeDocuments(LidVector lids, SerialNum serialNum) override; + void commit(SerialNum serialNum, OnWriteDoneType onWriteDone) override; + void heartBeat(search::SerialNum serialNum) override; + void compactLidSpace(uint32_t lidLimit, SerialNum serialNum) override; + + SerialNum getCurrentSerialNum() const override { + return current_serial_num(); + } + + SerialNum getFlushedSerialNum() const override { + return flush_serial_num(); + } + + IIndexCollection::SP getSourceCollection() const { + LockGuard lock(_new_search_lock); + return _source_list; + } + + searchcorespi::IndexSearchable::SP getSearchable() const override { + LockGuard lock(_new_search_lock); + return _source_list; + } + + search::SearchableStats getSearchableStats() const override { + LockGuard lock(_new_search_lock); + return _source_list->getSearchableStats(); + } + + IFlushTarget::List getFlushTargets() override; + void setSchema(const Schema & schema, SerialNum serialNum) override ; + void setMaxFlushed(uint32_t maxFlushed) override; +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp new file mode 100644 index 00000000000..695de7b84ff --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp @@ -0,0 +1,27 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "indexmaintainerconfig.h" + +using search::index::Schema; +using search::TuneFileAttributes; + +namespace searchcorespi::index { + +IndexMaintainerConfig::IndexMaintainerConfig(const vespalib::string &baseDir, + const WarmupConfig & warmup, + size_t maxFlushed, + const Schema &schema, + const search::SerialNum serialNum, + const TuneFileAttributes &tuneFileAttributes) + : _baseDir(baseDir), + _warmup(warmup), + _maxFlushed(maxFlushed), + _schema(schema), + _serialNum(serialNum), + _tuneFileAttributes(tuneFileAttributes) +{ +} + +IndexMaintainerConfig::~IndexMaintainerConfig() { } + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.h new file mode 100644 index 00000000000..3f890e6fa76 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainerconfig.h @@ -0,0 +1,64 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "warmupconfig.h" +#include <vespa/searchlib/common/tunefileinfo.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchcommon/common/schema.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi::index { + +/** + * Class that keeps the config used when constructing an index maintainer. + */ +class IndexMaintainerConfig { +private: + const vespalib::string _baseDir; + const WarmupConfig _warmup; + const size_t _maxFlushed; + const search::index::Schema _schema; + const search::SerialNum _serialNum; + const search::TuneFileAttributes _tuneFileAttributes; + +public: + IndexMaintainerConfig(const vespalib::string &baseDir, + const WarmupConfig & warmup, + size_t maxFlushed, + const search::index::Schema &schema, + const search::SerialNum serialNum, + const search::TuneFileAttributes &tuneFileAttributes); + + ~IndexMaintainerConfig(); + + /** + * Returns the base directory in which the maintainer will store its indexes. + */ + const vespalib::string &getBaseDir() const { + return _baseDir; + } + + WarmupConfig getWarmup() const { return _warmup; } + + /** + * Returns the initial schema containing all current index fields. + */ + const search::index::Schema &getSchema() const { + return _schema; + } + + search::SerialNum getSerialNum() const { return _serialNum; } + + /** + * Returns the specification on how to read/write attribute vector data files. + */ + const search::TuneFileAttributes &getTuneFileAttributes() const { + return _tuneFileAttributes; + } + + size_t getMaxFlushed() const { + return _maxFlushed; + } +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.cpp b/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.cpp new file mode 100644 index 00000000000..efd7827fc3d --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.cpp @@ -0,0 +1,22 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "indexmaintainercontext.h" + +using search::common::FileHeaderContext; +using search::TuneFileAttributes; +using searchcorespi::IIndexManager; + +namespace searchcorespi::index { + +IndexMaintainerContext::IndexMaintainerContext(IThreadingService &threadingService, + IIndexManager::Reconfigurer &reconfigurer, + const FileHeaderContext &fileHeaderContext, + vespalib::Executor & warmupExecutor) + : _threadingService(threadingService), + _reconfigurer(reconfigurer), + _fileHeaderContext(fileHeaderContext), + _warmupExecutor(warmupExecutor) +{ +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.h b/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.h new file mode 100644 index 00000000000..2c7aa4af48e --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexmaintainercontext.h @@ -0,0 +1,55 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "ithreadingservice.h" +#include "iindexmanager.h" +#include <vespa/searchlib/common/tunefileinfo.h> +#include <vespa/searchlib/common/fileheadercontext.h> +#include <vespa/vespalib/util/threadexecutor.h> + +namespace searchcorespi::index { + +/** + * Class that keeps the long-lived context used by an index maintainer. + */ +class IndexMaintainerContext { +private: + IThreadingService &_threadingService; + IIndexManager::Reconfigurer &_reconfigurer; + const search::common::FileHeaderContext &_fileHeaderContext; + vespalib::Executor & _warmupExecutor; + +public: + IndexMaintainerContext(IThreadingService &threadingService, + IIndexManager::Reconfigurer &reconfigurer, + const search::common::FileHeaderContext &fileHeaderContext, + vespalib::Executor & warmupExecutor); + + /** + * Returns the treading service that encapsulates the thread model used for writing. + */ + IThreadingService &getThreadingService() const { + return _threadingService; + } + + /** + * Returns the reconfigurer used to signal when the index maintainer has changed. + */ + IIndexManager::Reconfigurer &getReconfigurer() const { + return _reconfigurer; + } + + /** + * Returns the context used to insert extra tags into file headers before writing them. + */ + const search::common::FileHeaderContext &getFileHeaderContext() const { + return _fileHeaderContext; + } + + /** + * @return The executor that should be used for warmup. + */ + vespalib::Executor & getWarmupExecutor() const { return _warmupExecutor; } +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.cpp b/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.cpp new file mode 100644 index 00000000000..8ba9efe2734 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.cpp @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "indexmanagerconfig.h" + +namespace searchcorespi { + +IndexManagerConfig::IndexManagerConfig(const vespalib::string &configId, + const config::ConfigSnapshot &configSnapshot, + size_t numSearcherThreads) + : _configId(configId), + _configSnapshot(configSnapshot), + _numSearcherThreads(numSearcherThreads) +{ +} + +IndexManagerConfig::~IndexManagerConfig() { } + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.h b/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.h new file mode 100644 index 00000000000..decb03d97e0 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexmanagerconfig.h @@ -0,0 +1,42 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/config/retriever/configsnapshot.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { + +/** + * Class that keeps the config used when constructing an index manager. + */ +class IndexManagerConfig { +private: + vespalib::string _configId; + const config::ConfigSnapshot &_configSnapshot; + size_t _numSearcherThreads; + +public: + IndexManagerConfig(const vespalib::string &configId, + const config::ConfigSnapshot &configSnapshot, + size_t numSearcherThreads); + ~IndexManagerConfig(); + + /** + * Returns the config id used to retrieve the configs from the config snapshot instance. + */ + const vespalib::string &getConfigId() const { return _configId; } + + /** + * Returns the snapshot containing configs to be used by the index manager. + */ + const config::ConfigSnapshot &getConfigSnapshot() const { return _configSnapshot; } + + /** + * Returns the number of searcher threads that are used to query the index manager. + */ + size_t getNumSearcherThreads() const { return _numSearcherThreads; } +}; + +} // namespace searchcorespi + + diff --git a/searchcore/src/vespa/searchcorespi/index/indexreadutilities.cpp b/searchcore/src/vespa/searchcorespi/index/indexreadutilities.cpp new file mode 100644 index 00000000000..14556ddef29 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexreadutilities.cpp @@ -0,0 +1,88 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "indexreadutilities.h" +#include "indexdisklayout.h" +#include <vespa/fastlib/io/bufferedfile.h> +#include <vespa/vespalib/data/fileheader.h> +#include <set> +#include <vector> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexreadutilities"); + +using search::SerialNum; +using vespalib::FileHeader; + +namespace searchcorespi::index { + +namespace { + +/** + * Assumes that cleanup has removed all obsolete index dirs. + **/ +void +scanForIndexes(const vespalib::string &baseDir, + std::vector<vespalib::string> &flushDirs, + vespalib::string &fusionDir) +{ + FastOS_DirectoryScan dirScan(baseDir.c_str()); + while (dirScan.ReadNext()) { + if (!dirScan.IsDirectory()) { + continue; + } + vespalib::string name = dirScan.GetName(); + if (name.find(IndexDiskLayout::FlushDirPrefix) == 0) { + flushDirs.push_back(name); + } + if (name.find(IndexDiskLayout::FusionDirPrefix) == 0) { + if (!fusionDir.empty()) { + // Should never happen, since we run cleanup before load. + LOG(warning, "Base directory '%s' contains multiple fusion indexes", + baseDir.c_str()); + } + fusionDir = name; + } + } +} + +} + +FusionSpec +IndexReadUtilities::readFusionSpec(const vespalib::string &baseDir) +{ + std::vector<vespalib::string> flushDirs; + vespalib::string fusionDir; + scanForIndexes(baseDir, flushDirs, fusionDir); + + uint32_t fusionId = 0; + if (!fusionDir.empty()) { + fusionId = atoi(fusionDir.substr(IndexDiskLayout::FusionDirPrefix.size()).c_str()); + } + std::set<uint32_t> flushIds; + for (size_t i = 0; i < flushDirs.size(); ++i) { + uint32_t id = atoi(flushDirs[i].substr(IndexDiskLayout::FlushDirPrefix.size()).c_str()); + flushIds.insert(id); + } + + FusionSpec fusionSpec; + fusionSpec.last_fusion_id = fusionId; + fusionSpec.flush_ids.assign(flushIds.begin(), flushIds.end()); + return fusionSpec; +} + +SerialNum +IndexReadUtilities::readSerialNum(const vespalib::string &dir) +{ + const vespalib::string fileName = IndexDiskLayout::getSerialNumFileName(dir); + Fast_BufferedFile file; + file.ReadOpen(fileName.c_str()); + + FileHeader fileHeader; + fileHeader.readFile(file); + if (fileHeader.hasTag(IndexDiskLayout::SerialNumTag)) { + return fileHeader.getTag(IndexDiskLayout::SerialNumTag).asInteger(); + } + return 0; +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexreadutilities.h b/searchcore/src/vespa/searchcorespi/index/indexreadutilities.h new file mode 100644 index 00000000000..aeafd746772 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexreadutilities.h @@ -0,0 +1,23 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "fusionspec.h" +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { + +/** + * Utility class with functions to read aspects of an index from disk. + * Used by the index maintainer. + */ +struct IndexReadUtilities { + static FusionSpec readFusionSpec(const vespalib::string &baseDir); + static search::SerialNum readSerialNum(const vespalib::string &dir); +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcore/src/vespa/searchcorespi/index/indexsearchable.h b/searchcore/src/vespa/searchcorespi/index/indexsearchable.h new file mode 100644 index 00000000000..609d7854351 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexsearchable.h @@ -0,0 +1,59 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchcommon/attribute/iattributecontext.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchlib/index/i_field_length_inspector.h> +#include <vespa/searchlib/query/tree/node.h> +#include <vespa/searchlib/queryeval/blueprint.h> +#include <vespa/searchlib/queryeval/field_spec.h> +#include <vespa/searchlib/queryeval/irequestcontext.h> +#include <vespa/searchlib/queryeval/searchable.h> +#include <vespa/searchlib/util/searchable_stats.h> + +namespace searchcorespi { + +class IndexSearchableVisitor; + +/** + * Abstract class extended by components to expose content that can be + * searched by a query term. A IndexSearchable component supports searching + * in one or more named fields. The Blueprint created by a Searchable + * is an intermediate query representation that is later used to + * create the actual search iterators used to produce matches. + * + * The class is a specialized version of search::queryeval::Searchable + * that let the components access a per query attribute context that expose + * attribute vectors that can be utilized during query evaluation. + **/ +class IndexSearchable : public search::queryeval::Searchable, + public search::index::IFieldLengthInspector { +protected: + using IRequestContext = search::queryeval::IRequestContext; + using FieldSpec = search::queryeval::FieldSpec; + using FieldSpecList = search::queryeval::FieldSpecList; + using Node = search::query::Node; + using IAttributeContext = search::attribute::IAttributeContext; + using Blueprint = search::queryeval::Blueprint; +public: + typedef std::shared_ptr<IndexSearchable> SP; + + /** + * Returns the searchable stats for this index searchable. + */ + virtual search::SearchableStats getSearchableStats() const = 0; + + /** + * Returns the serial number for this index searchable. + */ + virtual search::SerialNum getSerialNum() const = 0; + + /** + * Calls visitor with properly downcasted argument to differentiate + * between different types of indexes (disk index or memory index). + */ + virtual void accept(IndexSearchableVisitor &visitor) const = 0; +}; + +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/indexsearchablevisitor.h b/searchcore/src/vespa/searchcorespi/index/indexsearchablevisitor.h new file mode 100644 index 00000000000..f85a2cf4af6 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexsearchablevisitor.h @@ -0,0 +1,26 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace searchcorespi { + +namespace index { + +struct IDiskIndex; +struct IMemoryIndex; + +} + +/* + * Interface for visiting an index searchable containing disk and + * memory indexes. + */ +class IndexSearchableVisitor +{ +public: + virtual ~IndexSearchableVisitor() { } + virtual void visit(const index::IDiskIndex &index) = 0; + virtual void visit(const index::IMemoryIndex &index) = 0; +}; + +} // namespace searchcorespi diff --git a/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.cpp b/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.cpp new file mode 100644 index 00000000000..cc2575f74d2 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.cpp @@ -0,0 +1,194 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "indexwriteutilities.h" +#include "indexdisklayout.h" +#include "indexreadutilities.h" +#include <vespa/searchlib/common/serialnumfileheadercontext.h> +#include <vespa/searchlib/index/schemautil.h> +#include <vespa/fastlib/io/bufferedfile.h> +#include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/exceptions.h> +#include <sstream> +#include <unistd.h> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexwriteutilities"); + + +using search::FixedSourceSelector; +using search::TuneFileAttributes; +using search::common::FileHeaderContext; +using search::common::SerialNumFileHeaderContext; +using search::index::Schema; +using search::index::SchemaUtil; +using search::SerialNum; +using vespalib::IllegalStateException; +using vespalib::FileHeader; + +namespace searchcorespi::index { + +namespace { + +SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max(); + +} + +void +IndexWriteUtilities::writeSerialNum(SerialNum serialNum, + const vespalib::string &dir, + const FileHeaderContext &fileHeaderContext) +{ + const vespalib::string fileName = + IndexDiskLayout::getSerialNumFileName(dir); + const vespalib::string tmpFileName = fileName + ".tmp"; + + SerialNumFileHeaderContext snFileHeaderContext(fileHeaderContext, serialNum); + Fast_BufferedFile file; + file.WriteOpen(tmpFileName.c_str()); + FileHeader fileHeader; + snFileHeaderContext.addTags(fileHeader, fileName); + fileHeader.putTag(FileHeader::Tag(IndexDiskLayout::SerialNumTag, serialNum)); + bool ok = (fileHeader.writeFile(file) >= fileHeader.getSize()); + if ( ! ok) { + LOG(error, "Unable to write file header '%s'", tmpFileName.c_str()); + } + if ( ! file.Sync()) { + ok = false; + LOG(error, "Unable to fsync '%s'", tmpFileName.c_str()); + } + if ( ! file.Close()) { + ok = false; + LOG(error, "Unable to close '%s'", tmpFileName.c_str()); + } + vespalib::File::sync(dir); + + if (ok) { + FastOS_File renameFile(tmpFileName.c_str()); + ok &= renameFile.Rename(fileName.c_str()); + } + if (!ok) { + std::ostringstream msg; + msg << "Unable to write serial number to '" << dir << "'."; + throw IllegalStateException(msg.str()); + } + vespalib::File::sync(dir); +} + +bool +IndexWriteUtilities::copySerialNumFile(const vespalib::string &sourceDir, + const vespalib::string &destDir) +{ + vespalib::string source = IndexDiskLayout::getSerialNumFileName(sourceDir); + vespalib::string dest = IndexDiskLayout::getSerialNumFileName(destDir); + vespalib::string tmpDest = dest + ".tmp"; + if (!FastOS_FileInterface::CopyFile(source.c_str(), tmpDest.c_str())) { + LOG(error, "Unable to copy file '%s'", source.c_str()); + return false; + } + FastOS_File file(tmpDest.c_str()); + if (!file.OpenReadWrite()) { + LOG(error, "Unable to open '%s' for fsync", tmpDest.c_str()); + return false; + } + if (!file.Sync()) { + LOG(error, "Unable to fsync '%s'", tmpDest.c_str()); + return false; + } + if (!file.Close()) { + LOG(error, "Unable to close '%s'", tmpDest.c_str()); + return false; + } + vespalib::File::sync(destDir); + if (!file.Rename(dest.c_str())) { + LOG(error, "Unable to rename file '%s' to '%s'", tmpDest.c_str(), dest.c_str()); + return false; + } + vespalib::File::sync(destDir); + return true; +} + +void +IndexWriteUtilities::writeSourceSelector(FixedSourceSelector::SaveInfo & + saveInfo, + uint32_t sourceId, + const TuneFileAttributes & + tuneFileAttributes, + const FileHeaderContext & + fileHeaderContext, + SerialNum serialNum) +{ + SerialNumFileHeaderContext snFileHeaderContext(fileHeaderContext, + serialNum); + if (!saveInfo.save(tuneFileAttributes, snFileHeaderContext)) { + std::ostringstream msg; + msg << "Flush of sourceselector failed. Source id = " << sourceId; + throw IllegalStateException(msg.str()); + } +} + +void +IndexWriteUtilities::updateDiskIndexSchema(const vespalib::string &indexDir, + const Schema &schema, + SerialNum serialNum) +{ + vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir); + Schema oldSchema; + if (!oldSchema.loadFromFile(schemaName)) { + LOG(error, "Could not open schema '%s'", + schemaName.c_str()); + return; + } + if (!SchemaUtil::validateSchema(oldSchema)) { + LOG(error, "Could not validate schema loaded from '%s'", + schemaName.c_str()); + return; + } + Schema::UP newSchema = Schema::intersect(oldSchema, schema); + if (*newSchema == oldSchema) { + return; + } + if (serialNum != noSerialNumHigh) { + SerialNum oldSerial = IndexReadUtilities::readSerialNum(indexDir); + if (oldSerial >= serialNum) { + return; + } + } + vespalib::string schemaTmpName = schemaName + ".tmp"; + vespalib::string schemaOrigName = schemaName + ".orig"; + vespalib::unlink(schemaTmpName); + if (!newSchema->saveToFile(schemaTmpName)) { + LOG(error, "Could not save schema to '%s'", + schemaTmpName.c_str()); + } + // XXX: FastOS layer violation + FastOS_StatInfo statInfo; + bool statres; + statres = FastOS_File::Stat(schemaOrigName.c_str(), &statInfo); + if (!statres) { + if (statInfo._error != FastOS_StatInfo::FileNotFound) { + LOG(error, "Failed to stat orig schema '%s': %s", + schemaOrigName.c_str(), + FastOS_File::getLastErrorString().c_str()); + } + int linkres = ::link(schemaName.c_str(), schemaOrigName.c_str()); + if (linkres != 0) { + LOG(error, "Could not link '%s' to '%s': %s", + schemaOrigName.c_str(), + schemaName.c_str(), + FastOS_File::getLastErrorString().c_str()); + } + vespalib::File::sync(indexDir); + } + // XXX: FastOS layer violation + int renameres = ::rename(schemaTmpName.c_str(), schemaName.c_str()); + if (renameres != 0) { + int error = errno; + std::string errString = FastOS_File::getErrorString(error); + LOG(error, "Could not rename '%s' to '%s': %s", + schemaTmpName.c_str(), + schemaName.c_str(), + errString.c_str()); + } + vespalib::File::sync(indexDir); +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.h b/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.h new file mode 100644 index 00000000000..313ab3cc1c7 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/indexwriteutilities.h @@ -0,0 +1,44 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchlib/attribute/fixedsourceselector.h> +#include <vespa/searchlib/common/tunefileinfo.h> +#include <vespa/searchlib/common/fileheadercontext.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi::index { + +/** + * Utility class with functions to write aspects of an index to disk. + * Used by the index maintainer. + */ +struct IndexWriteUtilities +{ + static void + writeSerialNum(search::SerialNum serialNum, + const vespalib::string &dir, + const search::common::FileHeaderContext &fileHeaderContext); + + static bool + copySerialNumFile(const vespalib::string &sourceDir, + const vespalib::string &destDir); + + static void + writeSourceSelector(search::FixedSourceSelector::SaveInfo &saveInfo, + uint32_t sourceId, + const search::TuneFileAttributes &tuneFileAttributes, + const search::common::FileHeaderContext & + fileHeaderContext, + search::SerialNum serialNum); + + static void + updateDiskIndexSchema(const vespalib::string &indexDir, + const search::index::Schema &schema, + search::SerialNum serialNum); +}; + +} + + diff --git a/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.cpp b/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.cpp new file mode 100644 index 00000000000..85e87965cb7 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.cpp @@ -0,0 +1,32 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "isearchableindexcollection.h" +#include <vespa/searchlib/queryeval/isourceselector.h> + +namespace searchcorespi { + +using search::queryeval::ISourceSelector; + +void +ISearchableIndexCollection::setCurrentIndex(uint32_t id) +{ + assert( id < ISourceSelector::SOURCE_LIMIT); + + _currentIndex = id; +} + +uint32_t +ISearchableIndexCollection::getCurrentIndex() const +{ + assert( valid() ); + + return _currentIndex; +} + +bool +ISearchableIndexCollection::valid() const +{ + return (_currentIndex > 0) && (_currentIndex < ISourceSelector::SOURCE_LIMIT); +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.h b/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.h new file mode 100644 index 00000000000..efd7062af71 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/isearchableindexcollection.h @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "iindexcollection.h" +#include "indexsearchable.h" + +namespace searchcorespi { + +/** + * Interface to both an IndexCollection and to an IndexSearchable + */ +class ISearchableIndexCollection : public IIndexCollection, + public IndexSearchable { +public: + ISearchableIndexCollection() : _currentIndex(-1) { } + using UP = std::unique_ptr<ISearchableIndexCollection>; + using SP = std::shared_ptr<ISearchableIndexCollection>; + + virtual void append(uint32_t id, const IndexSearchable::SP &source) = 0; + virtual void replace(uint32_t id, const IndexSearchable::SP &source) = 0; + virtual IndexSearchable::SP getSearchableSP(uint32_t i) const = 0; + virtual void setSource(uint32_t docId) = 0; + + void setCurrentIndex(uint32_t id); + uint32_t getCurrentIndex() const; + bool valid() const; + +private: + int32_t _currentIndex; +}; + +} // namespace searchcorespi + diff --git a/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h new file mode 100644 index 00000000000..c325d5ded11 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h @@ -0,0 +1,88 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "i_thread_service.h" + +class FNET_Transport; + +namespace vespalib { + class ISequencedTaskExecutor; + class Clock; +} +namespace searchcorespi::index { + +/** + * Interface for the thread model used for write tasks for a single document database. + * + * We have multiple write threads: + * + * 1. The "master" write thread used for the majority of write tasks. + * + * 2. The "index" write thread used for doing changes to the memory + * index, either directly (for data not bound to a field) or via + * index field inverter executor or index field writer executor. + * + * 3. The "summary" thread is used for doing changes to the document store. + * + * 4. The "index field inverter" executor is used to populate field + * inverters with data from document fields. Scheduled tasks for + * the same field are executed in sequence. + * + * 5. The "index field writer" executor is used to sort data in field + * inverters before pushing the data to the memory field indexes. + * Scheduled tasks for the same field are executed in sequence. + * + * 6. The "attribute field writer" executor is used to write data to attribute vectors. + * Each attribute is always handled by the same thread, + * and scheduled tasks for the same attribute are executed in sequence. + * + * The master write thread is always the one giving tasks to the other write threads above. + * + * In addition this interface exposes the "shared" executor that is used by all document databases. + * This is among others used for compressing / de-compressing documents in the document store, + * merging files as part of disk index fusion, and running the prepare step when doing two-phase + * puts against a tensor attribute with a HNSW index. + * + * The index write thread extracts fields from documents and gives + * task to the index field inverter executor and the index field + * writer executor. + * + * The index field inverter executor and index field writer executor + * are separate to allow for double buffering, i.e. populate one set + * of field inverters using the index field inverter executor while + * another set of field inverters are handled by the index field + * writer executor. + * + * We might decide to allow index field inverter tasks to schedule + * tasks to the index field writer executor, so draining logic needs + * to sync index field inverter executor before syncing index field + * writer executor. + * + * TODO: * indexFieldInverter and indexFieldWriter can be collapsed to one. Both need sequencing, + * but they sequence on different things so efficiency will be the same and just depends on #threads + */ +struct IThreadingService +{ + IThreadingService(const IThreadingService &) = delete; + IThreadingService & operator = (const IThreadingService &) = delete; + IThreadingService() = default; + virtual ~IThreadingService() = default; + + /** + * Block the calling thread until the master thread has capacity to handle more tasks, + * and then execute the given task in the master thread. + */ + virtual void blocking_master_execute(vespalib::Executor::Task::UP task) = 0; + + virtual ISyncableThreadService &master() = 0; + virtual IThreadService &index() = 0; + virtual vespalib::ThreadExecutor &summary() = 0; + virtual vespalib::Executor &shared() = 0; + virtual FNET_Transport &transport() = 0; + virtual const vespalib::Clock &clock() const = 0; + virtual vespalib::ISequencedTaskExecutor &indexFieldInverter() = 0; + virtual vespalib::ISequencedTaskExecutor &indexFieldWriter() = 0; + virtual vespalib::ISequencedTaskExecutor &attributeFieldWriter() = 0; +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/memory_index_stats.h b/searchcore/src/vespa/searchcorespi/index/memory_index_stats.h new file mode 100644 index 00000000000..ccc85ab4dc6 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/memory_index_stats.h @@ -0,0 +1,10 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "index_searchable_stats.h" + +namespace searchcorespi::index { + +using MemoryIndexStats = IndexSearchableStats; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/warmupconfig.h b/searchcore/src/vespa/searchcorespi/index/warmupconfig.h new file mode 100644 index 00000000000..8582b7256bc --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/warmupconfig.h @@ -0,0 +1,22 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/time.h> + +namespace searchcorespi::index { + +/** + * Keeps all config for controlling warmup. + **/ +class WarmupConfig { +public: + WarmupConfig() : _duration(vespalib::duration::zero()), _unpack(false) { } + WarmupConfig(vespalib::duration duration, bool unpack) : _duration(duration), _unpack(unpack) { } + vespalib::duration getDuration() const { return _duration; } + bool getUnpack() const { return _unpack; } +private: + const vespalib::duration _duration; + const bool _unpack; +}; + +} diff --git a/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.cpp new file mode 100644 index 00000000000..bf437dd7ee3 --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.cpp @@ -0,0 +1,269 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "warmupindexcollection.h" +#include "idiskindex.h" +#include <vespa/searchlib/fef/matchdatalayout.h> +#include <vespa/searchlib/query/tree/termnodes.h> +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/stllike/hash_set.h> +#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/eval/eval/value.h> +#include <thread> + +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.warmupindexcollection"); + +namespace searchcorespi { + +using index::IDiskIndex; +using search::fef::MatchDataLayout; +using search::index::FieldLengthInfo; +using search::query::StringBase; +using search::queryeval::Blueprint; +using search::queryeval::ISourceSelector; +using search::queryeval::SearchIterator; +using TermMap = vespalib::hash_set<vespalib::string>; + +class FieldTermMap : public vespalib::hash_map<uint32_t, TermMap> +{ + +}; + +WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig, + ISearchableIndexCollection::SP prev, + ISearchableIndexCollection::SP next, + IndexSearchable & warmup, + vespalib::Executor & executor, + const vespalib::Clock & clock, + IWarmupDone & warmupDone) : + _warmupConfig(warmupConfig), + _prev(std::move(prev)), + _next(std::move(next)), + _warmup(warmup), + _executor(executor), + _clock(clock), + _warmupDone(warmupDone), + _warmupEndTime(vespalib::steady_clock::now() + warmupConfig.getDuration()), + _handledTerms(std::make_unique<FieldTermMap>()), + _pendingTasks() +{ + if (_next->valid()) { + setCurrentIndex(_next->getCurrentIndex()); + } else { + LOG(warning, "Next index is not valid, Dangerous !! : %s", _next->toString().c_str()); + } + LOG(debug, "For %g seconds I will warm up '%s' %s unpack.", vespalib::to_s(warmupConfig.getDuration()), typeid(_warmup).name(), warmupConfig.getUnpack() ? "with" : "without"); + LOG(debug, "%s", toString().c_str()); +} + +void +WarmupIndexCollection::setSource(uint32_t docId) +{ + assert(_prev->valid()); + assert(_next->valid()); + _prev->setSource(docId); + _next->setSource(docId); +} + +vespalib::string +WarmupIndexCollection::toString() const +{ + vespalib::asciistream os; + os << "warmup : "; + if (dynamic_cast<const IDiskIndex *>(&_warmup) != nullptr) { + os << static_cast<const IDiskIndex &>(_warmup).getIndexDir(); + } else { + os << typeid(_warmup).name(); + } + os << "\n"; + os << "next : " << _next->toString() << "\n"; + os << "prev : " << _prev->toString() << "\n"; + return os.str(); +} + +WarmupIndexCollection::~WarmupIndexCollection() +{ + if (_warmupEndTime != vespalib::steady_time()) { + LOG(info, "Warmup aborted due to new state change or application shutdown"); + } + assert(_pendingTasks.has_zero_ref_count()); +} + +const ISourceSelector & +WarmupIndexCollection::getSourceSelector() const +{ + return _next->getSourceSelector(); +} + +size_t +WarmupIndexCollection::getSourceCount() const +{ + return _next->getSourceCount(); +} + +IndexSearchable & +WarmupIndexCollection::getSearchable(uint32_t i) const +{ + return _next->getSearchable(i); +} + +uint32_t +WarmupIndexCollection::getSourceId(uint32_t i) const +{ + return _next->getSourceId(i); +} + +void +WarmupIndexCollection::fireWarmup(Task::UP task) +{ + vespalib::steady_time now(vespalib::steady_clock::now()); + if (now < _warmupEndTime) { + _executor.execute(std::move(task)); + } else { + std::unique_lock<std::mutex> guard(_lock); + if (_warmupEndTime != vespalib::steady_time()) { + _warmupEndTime = vespalib::steady_time(); + guard.unlock(); + LOG(info, "Done warming up. Posting WarmupDoneTask"); + _warmupDone.warmupDone(shared_from_this()); + } + } +} + +bool +WarmupIndexCollection::handledBefore(uint32_t fieldId, const Node &term) +{ + const StringBase * sb(dynamic_cast<const StringBase *>(&term)); + if (sb != nullptr) { + const vespalib::string & s = sb->getTerm(); + std::lock_guard<std::mutex> guard(_lock); + TermMap::insert_result found = (*_handledTerms)[fieldId].insert(s); + return ! found.second; + } + return true; +} +Blueprint::UP +WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term) +{ + FieldSpecList fsl; + fsl.add(field); + return createBlueprint(requestContext, fsl,term); +} + +Blueprint::UP +WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term) +{ + if ( _warmupEndTime == vespalib::steady_time()) { + // warmup done + return _next->createBlueprint(requestContext, fields, term); + } + MatchDataLayout mdl; + FieldSpecList fsl; + bool needWarmUp(false); + for(size_t i(0); i < fields.size(); i++) { + const FieldSpec & f(fields[i]); + FieldSpec fs(f.getName(), f.getFieldId(), mdl.allocTermField(f.getFieldId()), f.isFilter()); + fsl.add(fs); + needWarmUp = needWarmUp || ! handledBefore(fs.getFieldId(), term); + } + if (needWarmUp) { + auto task = std::make_unique<WarmupTask>(mdl.createMatchData(), shared_from_this()); + task->createBlueprint(fsl, term); + fireWarmup(std::move(task)); + } + return _prev->createBlueprint(requestContext, fields, term); +} + +search::SearchableStats +WarmupIndexCollection::getSearchableStats() const +{ + return _prev->getSearchableStats(); +} + + +search::SerialNum +WarmupIndexCollection::getSerialNum() const +{ + return std::max(_prev->getSerialNum(), _next->getSerialNum()); +} + + +void +WarmupIndexCollection::accept(IndexSearchableVisitor &visitor) const +{ + _prev->accept(visitor); + _next->accept(visitor); +} + +FieldLengthInfo +WarmupIndexCollection::get_field_length_info(const vespalib::string& field_name) const +{ + return _next->get_field_length_info(field_name); +} + +void +WarmupIndexCollection::append(uint32_t id, const IndexSearchable::SP &source) +{ + _next->append(id, source); +} + +void +WarmupIndexCollection::replace(uint32_t id, const IndexSearchable::SP &source) +{ + _next->replace(id, source); +} + +IndexSearchable::SP +WarmupIndexCollection::getSearchableSP(uint32_t i) const +{ + return _next->getSearchableSP(i); +} + +void +WarmupIndexCollection::drainPending() { + _pendingTasks.waitForZeroRefCount(); +} + +WarmupIndexCollection::WarmupRequestContext::WarmupRequestContext(const vespalib::Clock & clock) + : _doom(clock, vespalib::steady_time::max(), vespalib::steady_time::max(), false) +{} +WarmupIndexCollection::WarmupRequestContext::~WarmupRequestContext() = default; + +std::unique_ptr<vespalib::eval::Value> +WarmupIndexCollection::WarmupRequestContext::get_query_tensor(const vespalib::string&) const { + return {}; +} +WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup) + : _warmup(std::move(warmup)), + _retainGuard(_warmup->_pendingTasks), + _matchData(std::move(md)), + _bluePrint(), + _requestContext(_warmup->_clock) +{ +} + +WarmupIndexCollection::WarmupTask::~WarmupTask() = default; + +void +WarmupIndexCollection::WarmupTask::run() +{ + if (_warmup->_warmupEndTime != vespalib::steady_time()) { + LOG(debug, "Warming up %s", _bluePrint->asString().c_str()); + _bluePrint->fetchPostings(search::queryeval::ExecuteInfo::TRUE); + SearchIterator::UP it(_bluePrint->createSearch(*_matchData, true)); + it->initFullRange(); + for (uint32_t docId = it->seekFirst(1); !it->isAtEnd(); docId = it->seekNext(docId+1)) { + if (_warmup->doUnpack()) { + it->unpack(docId); + } + } + } else { + LOG(debug, "Warmup has finished, ignoring task."); + } +} + +} diff --git a/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.h new file mode 100644 index 00000000000..c2d70b4fd5c --- /dev/null +++ b/searchcore/src/vespa/searchcorespi/index/warmupindexcollection.h @@ -0,0 +1,128 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "isearchableindexcollection.h" +#include "warmupconfig.h" +#include <vespa/searchlib/attribute/attribute_blueprint_params.h> +#include <vespa/vespalib/util/doom.h> +#include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> + +namespace searchcorespi { + +class FieldTermMap; +class WarmupIndexCollection; + +class IWarmupDone { +public: + virtual ~IWarmupDone() { } + virtual void warmupDone(std::shared_ptr<WarmupIndexCollection> current) = 0; +}; +/** + * Index collection that holds a reference to the active one and a new one that + * is to be warmed up. + */ +class WarmupIndexCollection : public ISearchableIndexCollection, + public std::enable_shared_from_this<WarmupIndexCollection> +{ + using WarmupConfig = index::WarmupConfig; +public: + typedef std::shared_ptr<WarmupIndexCollection> SP; + WarmupIndexCollection(const WarmupConfig & warmupConfig, + ISearchableIndexCollection::SP prev, + ISearchableIndexCollection::SP next, + IndexSearchable & warmup, + vespalib::Executor & executor, + const vespalib::Clock & clock, + IWarmupDone & warmupDone); + ~WarmupIndexCollection() override; + // Implements IIndexCollection + const ISourceSelector &getSourceSelector() const override; + size_t getSourceCount() const override; + IndexSearchable &getSearchable(uint32_t i) const override; + uint32_t getSourceId(uint32_t i) const override; + + // Implements IndexSearchable + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term) override; + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term) override; + search::SearchableStats getSearchableStats() const override; + search::SerialNum getSerialNum() const override; + void accept(IndexSearchableVisitor &visitor) const override; + + // Implements IFieldLengthInspector + search::index::FieldLengthInfo get_field_length_info(const vespalib::string& field_name) const override; + + // Implements ISearchableIndexCollection + void append(uint32_t id, const IndexSearchable::SP &source) override; + void replace(uint32_t id, const IndexSearchable::SP &source) override; + IndexSearchable::SP getSearchableSP(uint32_t i) const override; + void setSource(uint32_t docId) override; + + const ISearchableIndexCollection::SP & getNextIndexCollection() const { return _next; } + vespalib::string toString() const override; + bool doUnpack() const { return _warmupConfig.getUnpack(); } + void drainPending(); +private: + typedef search::fef::MatchData MatchData; + typedef vespalib::Executor::Task Task; + class WarmupRequestContext : public IRequestContext { + using IAttributeVector = search::attribute::IAttributeVector; + using AttributeBlueprintParams = search::attribute::AttributeBlueprintParams; + public: + WarmupRequestContext(const vespalib::Clock & clock); + ~WarmupRequestContext() override; + const vespalib::Doom & getDoom() const override { return _doom; } + const IAttributeVector *getAttribute(const vespalib::string &) const override { return nullptr; } + const IAttributeVector *getAttributeStableEnum(const vespalib::string &) const override { return nullptr; } + std::unique_ptr<vespalib::eval::Value> get_query_tensor(const vespalib::string&) const override; + const AttributeBlueprintParams& get_attribute_blueprint_params() const override { return _params; } + private: + const vespalib::Doom _doom; + const AttributeBlueprintParams _params; + }; + class WarmupTask : public Task { + public: + WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup); + ~WarmupTask() override; + WarmupTask &createBlueprint(const FieldSpec &field, const Node &term) { + _bluePrint = _warmup->createBlueprint(_requestContext, field, term); + return *this; + } + WarmupTask &createBlueprint(const FieldSpecList &fields, const Node &term) { + _bluePrint = _warmup->createBlueprint(_requestContext, fields, term); + return *this; + } + private: + void run() override; + std::shared_ptr<WarmupIndexCollection> _warmup; + vespalib::RetainGuard _retainGuard; + std::unique_ptr<MatchData> _matchData; + Blueprint::UP _bluePrint; + WarmupRequestContext _requestContext; + }; + + void fireWarmup(Task::UP task); + bool handledBefore(uint32_t fieldId, const Node &term); + + const WarmupConfig _warmupConfig; + ISearchableIndexCollection::SP _prev; + ISearchableIndexCollection::SP _next; + IndexSearchable & _warmup; + vespalib::Executor & _executor; + const vespalib::Clock & _clock; + IWarmupDone & _warmupDone; + vespalib::steady_time _warmupEndTime; + std::mutex _lock; + std::unique_ptr<FieldTermMap> _handledTerms; + vespalib::MonitoredRefCount _pendingTasks; +}; + +} // namespace searchcorespi |