diff options
author | Tor Egge <Tor.Egge@online.no> | 2022-01-12 17:19:28 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2022-01-12 17:19:28 +0100 |
commit | 5d48b71920508259eee105a5aaba4b2c3f1a8a7b (patch) | |
tree | 72394d8fc8ff385df1ffc35246ca01035b2753d4 /searchcorespi | |
parent | a0fa1048ba798b6b7efbd26a882c64c783dd4c01 (diff) |
Consider disk space used by fusion indexes beyond current fusion index or
indexes before current fusion index as transient disk space.
Diffstat (limited to 'searchcorespi')
8 files changed, 263 insertions, 26 deletions
diff --git a/searchcorespi/src/tests/index/active_disk_indexes/active_disk_indexes_test.cpp b/searchcorespi/src/tests/index/active_disk_indexes/active_disk_indexes_test.cpp index a6d412817d9..fb8498abce2 100644 --- a/searchcorespi/src/tests/index/active_disk_indexes/active_disk_indexes_test.cpp +++ b/searchcorespi/src/tests/index/active_disk_indexes/active_disk_indexes_test.cpp @@ -2,21 +2,43 @@ #include <vespa/searchcorespi/index/activediskindexes.h> #include <vespa/searchcorespi/index/index_disk_dir.h> +#include <vespa/searchcorespi/index/indexdisklayout.h> +#include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/gtest/gtest.h> +#include <fstream> + +namespace { + +vespalib::string base_dir("base"); + +constexpr uint32_t block_size = 4_Ki; + +} namespace searchcorespi::index { class ActiveDiskIndexesTest : public ::testing::Test, public ActiveDiskIndexes { + IndexDiskLayout _layout; protected: ActiveDiskIndexesTest(); ~ActiveDiskIndexesTest(); + + static IndexDiskDir get_index_disk_dir(const vespalib::string& dir) { + return IndexDiskLayout::get_index_disk_dir(dir); + } + + void assert_transient_size(uint64_t exp, IndexDiskDir index_disk_dir) { + EXPECT_EQ(exp, get_transient_size(_layout, index_disk_dir)); + } }; ActiveDiskIndexesTest::ActiveDiskIndexesTest() : ::testing::Test(), - ActiveDiskIndexes() + ActiveDiskIndexes(), + _layout(base_dir) { } @@ -25,7 +47,7 @@ ActiveDiskIndexesTest::~ActiveDiskIndexesTest() = default; TEST_F(ActiveDiskIndexesTest, simple_set_active_works) { EXPECT_FALSE(isActive("index.flush.1")); - setActive("index.flush.1"); + setActive("index.flush.1", 0); EXPECT_TRUE(isActive("index.flush.1")); notActive("index.flush.1"); EXPECT_FALSE(isActive("index.flush.1")); @@ -33,8 +55,8 @@ TEST_F(ActiveDiskIndexesTest, simple_set_active_works) TEST_F(ActiveDiskIndexesTest, nested_set_active_works) { - setActive("index.flush.1"); - setActive("index.flush.1"); + setActive("index.flush.1", 0); + setActive("index.flush.1", 0); EXPECT_TRUE(isActive("index.flush.1")); notActive("index.flush.1"); EXPECT_TRUE(isActive("index.flush.1")); @@ -48,6 +70,90 @@ TEST_F(ActiveDiskIndexesTest, is_active_returns_false_for_bad_name) EXPECT_FALSE(isActive("index.flush.0")); } +TEST_F(ActiveDiskIndexesTest, remove_works) +{ + EXPECT_TRUE(remove(IndexDiskDir())); + auto fusion1 = get_index_disk_dir("index.fusion.1"); + EXPECT_TRUE(remove(fusion1)); + add_not_active(fusion1); + EXPECT_TRUE(remove(fusion1)); + setActive("index.fusion.1", 0); + EXPECT_FALSE(remove(fusion1)); + notActive("index.fusion.1"); + EXPECT_TRUE(remove(fusion1)); +} + +TEST_F(ActiveDiskIndexesTest, basic_get_transient_size_works) +{ + setActive("index.fusion.1", 1000000); + setActive("index.flush.2", 500000); + setActive("index.fusion.2", 1200000); + auto fusion1 = get_index_disk_dir("index.fusion.1"); + auto flush2 = get_index_disk_dir("index.flush.2"); + auto fusion2 = get_index_disk_dir("index.fusion.2"); + { + SCOPED_TRACE("index.fusion.1"); + assert_transient_size(1200000, fusion1); + } + { + SCOPED_TRACE("index.flush.2"); + assert_transient_size(0, flush2); + } + { + SCOPED_TRACE("index.fusion.2"); + assert_transient_size(1500000, fusion2); + } + notActive("index.fusion.2"); + { + SCOPED_TRACE("index.fusion.1 after remove of index.fusion.2"); + assert_transient_size(0, fusion1); + } +} + +TEST_F(ActiveDiskIndexesTest, dynamic_get_transient_size_works) +{ + setActive("index.fusion.1", 1000000); + auto fusion1 = get_index_disk_dir("index.fusion.1"); + auto fusion2 = get_index_disk_dir("index.fusion.2"); + add_not_active(fusion2); + { + SCOPED_TRACE("dir missing"); + assert_transient_size(0, fusion1); + } + auto dir = base_dir + "/index.fusion.2"; + vespalib::mkdir(dir, true); + { + SCOPED_TRACE("empty dir"); + assert_transient_size(0, fusion1); + } + constexpr uint32_t seek_pos = 999999; + { + std::string name = dir + "/foo"; + std::ofstream ostr(name, std::ios::binary); + ostr.seekp(seek_pos); + ostr.write(" ", 1); + ostr.flush(); + ostr.close(); + } + { + SCOPED_TRACE("single file"); + assert_transient_size((seek_pos + block_size) / block_size * block_size, fusion1); + } + EXPECT_TRUE(remove(fusion2)); + { + SCOPED_TRACE("removed"); + assert_transient_size(0, fusion1); + } } -GTEST_MAIN_RUN_ALL_TESTS() +} + +int +main(int argc, char* argv[]) +{ + vespalib::rmdir(base_dir, true); + ::testing::InitGoogleTest(&argc, argv); + auto result = RUN_ALL_TESTS(); + vespalib::rmdir(base_dir, true); + return result; +} diff --git a/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp index fb9585fb58e..ddee57ea873 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp @@ -4,7 +4,9 @@ #include "indexdisklayout.h" #include "index_disk_dir.h" #include "index_disk_dir_active_state.h" +#include <vespa/searchlib/util/dirtraverse.h> #include <cassert> +#include <vector> using vespalib::string; @@ -13,12 +15,17 @@ namespace searchcorespi::index { ActiveDiskIndexes::ActiveDiskIndexes() = default; ActiveDiskIndexes::~ActiveDiskIndexes() = default; -void ActiveDiskIndexes::setActive(const string &index) { +void +ActiveDiskIndexes::setActive(const string &index, uint64_t size_on_disk) +{ auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(index); assert(index_disk_dir.valid()); std::lock_guard lock(_lock); auto insres = _active.insert(std::make_pair(index_disk_dir, IndexDiskDirActiveState())); insres.first->second.activate(); + if (!insres.first->second.get_size_on_disk().has_value()) { + insres.first->second.set_size_on_disk(size_on_disk); + } } void ActiveDiskIndexes::notActive(const string & index) { @@ -44,4 +51,66 @@ bool ActiveDiskIndexes::isActive(const string &index) const { return (it != _active.end()) && it->second.is_active(); } + +void +ActiveDiskIndexes::add_not_active(IndexDiskDir index_disk_dir) +{ + std::lock_guard lock(_lock); + _active.insert(std::make_pair(index_disk_dir, IndexDiskDirActiveState())); +} + +bool +ActiveDiskIndexes::remove(IndexDiskDir index_disk_dir) +{ + if (!index_disk_dir.valid()) { + return true; + } + std::lock_guard lock(_lock); + auto it = _active.find(index_disk_dir); + if (it == _active.end()) { + return true; + } + if (it->second.is_active()) { + return false; + } + _active.erase(it); + return true; +} + +uint64_t +ActiveDiskIndexes::get_transient_size(IndexDiskLayout& layout, IndexDiskDir index_disk_dir) const +{ + if (!index_disk_dir.valid() || !index_disk_dir.get_fusion()) { + return 0u; + } + uint64_t transient_size = 0u; + std::vector<IndexDiskDir> deferred; + { + std::lock_guard lock(_lock); + for (auto &entry : _active) { + if (entry.first < index_disk_dir) { + if (entry.second.get_size_on_disk().has_value()) { + transient_size += entry.second.get_size_on_disk().value(); + } + } + if (index_disk_dir < entry.first && entry.first.get_fusion()) { + if (entry.second.get_size_on_disk().has_value()) { + transient_size += entry.second.get_size_on_disk().value(); + } else { + deferred.emplace_back(entry.first); + } + } + } + } + for (auto& entry : deferred) { + auto index_dir = layout.getFusionDir(entry.get_id()); + try { + search::DirectoryTraverse dirt(index_dir.c_str()); + transient_size += dirt.GetTreeSize(); + } catch (std::exception &) { + } + } + return transient_size; +} + } diff --git a/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h index 365025e7450..4eed7ae47f9 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h +++ b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h @@ -11,6 +11,7 @@ namespace searchcorespi::index { class IndexDiskDir; class IndexDiskDirActiveState; +class IndexDiskLayout; /** * Class used to keep track of the set of active disk indexes in an index maintainer. @@ -26,9 +27,12 @@ public: ~ActiveDiskIndexes(); ActiveDiskIndexes(const ActiveDiskIndexes &) = delete; ActiveDiskIndexes & operator = (const ActiveDiskIndexes &) = delete; - void setActive(const vespalib::string & index); + void setActive(const vespalib::string & index, uint64_t size_on_disk); void notActive(const vespalib::string & index); bool isActive(const vespalib::string & index) const; + void add_not_active(IndexDiskDir index_disk_dir); + bool remove(IndexDiskDir index_disk_dir); + uint64_t get_transient_size(IndexDiskLayout& layout, IndexDiskDir index_disk_dir) const; }; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp index cce880eed3f..c1ee0b441c9 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp @@ -2,6 +2,8 @@ #include "diskindexcleaner.h" #include "activediskindexes.h" +#include "indexdisklayout.h" +#include "index_disk_dir.h" #include <vespa/fastos/file.h> #include <vespa/vespalib/io/fileutil.h> #include <sstream> @@ -80,13 +82,13 @@ bool isOldIndex(const string &index, uint32_t last_fusion_id) { } void removeOld(const string &base_dir, const vector<string> &indexes, - const ActiveDiskIndexes &active_indexes, bool remove) { + ActiveDiskIndexes &active_indexes, bool remove) { uint32_t last_fusion_id = findLastFusionId(base_dir, indexes); for (size_t i = 0; i < indexes.size(); ++i) { const string index_dir = base_dir + "/" + indexes[i]; + auto index_disk_dir = IndexDiskLayout::get_index_disk_dir(indexes[i]); if (isOldIndex(indexes[i], last_fusion_id) && - !active_indexes.isActive(index_dir)) - { + active_indexes.remove(index_disk_dir)) { if (remove) { removeDir(index_dir); } else { @@ -108,14 +110,14 @@ void removeInvalid(const string &base_dir, const vector<string> &indexes) { } // namespace void DiskIndexCleaner::clean(const string &base_dir, - const ActiveDiskIndexes &active_indexes) { + ActiveDiskIndexes &active_indexes) { vector<string> indexes = readIndexes(base_dir); removeOld(base_dir, indexes, active_indexes, false); removeInvalid(base_dir, indexes); } void DiskIndexCleaner::removeOldIndexes( - const string &base_dir, const ActiveDiskIndexes &active_indexes) { + const string &base_dir, ActiveDiskIndexes &active_indexes) { vector<string> indexes = readIndexes(base_dir); removeOld(base_dir, indexes, active_indexes, true); } diff --git a/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h index 798193ab00b..50179087de6 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h +++ b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h @@ -16,9 +16,9 @@ struct DiskIndexCleaner { * Deletes all indexes with id lower than the most recent fusion id. */ static void clean(const vespalib::string &index_dir, - const ActiveDiskIndexes& active_indexes); + ActiveDiskIndexes& active_indexes); static void removeOldIndexes(const vespalib::string &index_dir, - const ActiveDiskIndexes& active_indexes); + ActiveDiskIndexes& active_indexes); }; } // namespace index diff --git a/searchcorespi/src/vespa/searchcorespi/index/index_disk_dir.h b/searchcorespi/src/vespa/searchcorespi/index/index_disk_dir.h index 278fd73e555..dc983830e49 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/index_disk_dir.h +++ b/searchcorespi/src/vespa/searchcorespi/index/index_disk_dir.h @@ -29,6 +29,8 @@ public: return (_id == rhs._id) && (_fusion == rhs._fusion); } bool valid() const noexcept { return _id != 0u; } + bool get_fusion() const noexcept { return _fusion; } + uint64_t get_id() const noexcept { return _id; } }; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/index_disk_dir_active_state.h b/searchcorespi/src/vespa/searchcorespi/index/index_disk_dir_active_state.h index ac84de5adee..fb6b63bfb48 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/index_disk_dir_active_state.h +++ b/searchcorespi/src/vespa/searchcorespi/index/index_disk_dir_active_state.h @@ -3,6 +3,7 @@ #pragma once #include <cstdint> +#include <optional> namespace searchcorespi::index { @@ -11,15 +12,19 @@ namespace searchcorespi::index { */ class IndexDiskDirActiveState { uint32_t _active_count; + std::optional<uint64_t> _size_on_disk; public: IndexDiskDirActiveState() - : _active_count(0) + : _active_count(0), + _size_on_disk() { } void activate() noexcept { ++_active_count; } void deactivate() noexcept; bool is_active() const noexcept { return _active_count != 0; } + const std::optional<uint64_t>& get_size_on_disk() const noexcept { return _size_on_disk; } + void set_size_on_disk(uint64_t size_on_disk) noexcept { _size_on_disk = size_on_disk; } }; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index 6489cb156ce..43ecfe8e99d 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -8,6 +8,7 @@ #include "indexmaintainer.h" #include "indexreadutilities.h" #include "indexwriteutilities.h" +#include "index_disk_dir.h" #include <vespa/searchcorespi/flush/lambdaflushtask.h> #include <vespa/searchlib/common/i_flush_token.h> #include <vespa/searchlib/index/schemautil.h> @@ -88,13 +89,22 @@ class DiskIndexWithDestructorCallback : public IDiskIndex { private: std::shared_ptr<IDestructorCallback> _callback; IDiskIndex::SP _index; + IndexDiskDir _index_disk_dir; + IndexDiskLayout& _layout; + ActiveDiskIndexes& _active_indexes; public: DiskIndexWithDestructorCallback(IDiskIndex::SP index, - std::shared_ptr<IDestructorCallback> callback) noexcept + std::shared_ptr<IDestructorCallback> callback, + IndexDiskLayout& layout, + ActiveDiskIndexes& active_indexes) noexcept : _callback(std::move(callback)), - _index(std::move(index)) - { } + _index(std::move(index)), + _index_disk_dir(IndexDiskLayout::get_index_disk_dir(_index->getIndexDir())), + _layout(layout), + _active_indexes(active_indexes) + { + } ~DiskIndexWithDestructorCallback() override; const IDiskIndex &getWrapped() const { return *_index; } @@ -117,8 +127,7 @@ public: { return _index->createBlueprint(requestContext, fields, term); } - // TODO: Calculate the total disk size of current fusion indexes and set fusion_size_on_disk(). - search::SearchableStats getSearchableStats() const override { return _index->getSearchableStats(); } + search::SearchableStats getSearchableStats() const override; search::SerialNum getSerialNum() const override { return _index->getSerialNum(); } @@ -143,6 +152,16 @@ public: DiskIndexWithDestructorCallback::~DiskIndexWithDestructorCallback() = default; +search::SearchableStats +DiskIndexWithDestructorCallback::getSearchableStats() const +{ + auto stats = _index->getSearchableStats(); + uint64_t transient_size = _active_indexes.get_transient_size(_layout, _index_disk_dir); + stats.fusion_size_on_disk(transient_size); + return stats; +} + + } // namespace IndexMaintainer::FusionArgs::FusionArgs() @@ -284,10 +303,13 @@ IndexMaintainer::loadDiskIndex(const string &indexDir) EventLogger::diskIndexLoadStart(indexDir); } vespalib::Timer timer; - _active_indexes->setActive(indexDir); + auto index = _operations.loadDiskIndex(indexDir); + auto stats = index->getSearchableStats(); + _active_indexes->setActive(indexDir, stats.sizeOnDisk()); auto retval = std::make_shared<DiskIndexWithDestructorCallback>( - _operations.loadDiskIndex(indexDir), - makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); })); + std::move(index), + makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }), + _layout, *_active_indexes); if (LOG_WOULD_LOG(event)) { EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed())); } @@ -303,11 +325,14 @@ IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex) EventLogger::diskIndexLoadStart(indexDir); } vespalib::Timer timer; - _active_indexes->setActive(indexDir); const IDiskIndex &wrappedDiskIndex = (dynamic_cast<const DiskIndexWithDestructorCallback &>(oldIndex)).getWrapped(); + auto index = _operations.reloadDiskIndex(wrappedDiskIndex); + auto stats = index->getSearchableStats(); + _active_indexes->setActive(indexDir, stats.sizeOnDisk()); auto retval = std::make_shared<DiskIndexWithDestructorCallback>( - _operations.reloadDiskIndex(wrappedDiskIndex), - makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); })); + std::move(index), + makeLambdaCallback([this, indexDir]() { deactivateDiskIndexes(indexDir); }), + _layout, *_active_indexes); if (LOG_WOULD_LOG(event)) { EventLogger::diskIndexLoadComplete(indexDir, vespalib::count_ms(timer.elapsed())); } @@ -1013,6 +1038,27 @@ IndexMaintainer::doFusion(SerialNum serialNum, std::shared_ptr<search::IFlushTok return getFusionDir(new_fusion_id); } +namespace { + +class RemoveFusionIndexGuard { + ActiveDiskIndexes* _active_indexes; + IndexDiskDir _index_disk_dir; +public: + RemoveFusionIndexGuard(ActiveDiskIndexes& active_indexes, IndexDiskDir index_disk_dir) + : _active_indexes(&active_indexes), + _index_disk_dir(index_disk_dir) + { + _active_indexes->add_not_active(index_disk_dir); + } + ~RemoveFusionIndexGuard() { + if (_active_indexes != nullptr) { + (void) _active_indexes->remove(_index_disk_dir); + } + } + void reset() { _active_indexes = nullptr; } +}; + +} uint32_t IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search::IFlushToken> flush_token) @@ -1034,6 +1080,8 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search if (FastOS_File::Stat(lastSerialFile.c_str(), &statInfo)) { serialNum = IndexReadUtilities::readSerialNum(lastFlushDir); } + IndexDiskDir fusion_index_disk_dir(fusion_spec.flush_ids.back(), true); + RemoveFusionIndexGuard remove_fusion_index_guard(*_active_indexes, fusion_index_disk_dir); FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext()); uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations, flush_token); bool ok = (new_fusion_id != 0); @@ -1066,6 +1114,7 @@ IndexMaintainer::runFusion(const FusionSpec &fusion_spec, std::shared_ptr<search } ChangeGens changeGens = getChangeGens(); IDiskIndex::SP new_index(loadDiskIndex(new_fusion_dir)); + remove_fusion_index_guard.reset(); // Post processing after fusion operation has completed and new disk // index has been opened. |