diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-05 12:51:15 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2021-01-05 14:58:59 +0100 |
commit | 42a02eddf6b450682994afae68249171fe7876b8 (patch) | |
tree | 6a0a14eac83492008788f41baf81c6dcc87d0a5d /searchlib/src/tests | |
parent | 6382cb8513ab166e4e4184e0ddebd60f97fb6bb3 (diff) |
Add low level support for stopping a running disk index fusion.
Diffstat (limited to 'searchlib/src/tests')
-rw-r--r-- | searchlib/src/tests/diskindex/fusion/fusion_test.cpp | 81 |
1 files changed, 71 insertions, 10 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index c94d19cb3b7..efc9e99bf88 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchlib/common/flush_token.h> #include <vespa/searchlib/diskindex/diskindex.h> #include <vespa/searchlib/diskindex/fusion.h> #include <vespa/searchlib/diskindex/indexbuilder.h> @@ -61,6 +62,7 @@ protected: void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap); void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector); + bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token); void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources); public: FusionTest(); @@ -390,7 +392,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump2"); ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, dynamicKPosOcc, - tuneFileIndexing,fileHeaderContext, executor)); + tuneFileIndexing,fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw3(prefix + "dump3"); @@ -403,7 +405,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump3"); ASSERT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector, dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor)); + tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw4(prefix + "dump4"); @@ -416,7 +418,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump3"); ASSERT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector, dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor)); + tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw5(prefix + "dump5"); @@ -429,7 +431,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump3"); ASSERT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector, !dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor)); + tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw6(prefix + "dump6"); @@ -442,7 +444,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire sources.push_back(prefix + "dump2"); ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector, dynamicKPosOcc, - tuneFileIndexing, fileHeaderContext, executor)); + tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>())); } while (0); do { DiskIndex dw3(prefix + "dump3"); @@ -476,16 +478,22 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng ib.close(); } -void -FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources) +bool +FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token) { vespalib::ThreadStackExecutor executor(4, 0x10000); TuneFileIndexing tuneFileIndexing; DummyFileHeaderContext fileHeaderContext; SelectorArray selector(20, 0); - ASSERT_TRUE(Fusion::merge(_schema, dump_dir, sources, selector, - false, - tuneFileIndexing, fileHeaderContext, executor)); + return Fusion::merge(_schema, dump_dir, sources, selector, + false, + tuneFileIndexing, fileHeaderContext, executor, flush_token); +} + +void +FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources) +{ + ASSERT_TRUE(try_merge_simple_indexes(dump_dir, sources, std::make_shared<FlushToken>())); } FusionTest::FusionTest() @@ -553,6 +561,59 @@ TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed) clean_field_length_testdirs(); } +namespace { + +void clean_stopped_fusion_testdirs() +{ + vespalib::rmdir("stopdump2", true); + vespalib::rmdir("stopdump3", true); +} + +class MyFlushToken : public FlushToken +{ + mutable std::atomic<size_t> _checks; + const size_t _limit; +public: + MyFlushToken(size_t limit) + : FlushToken(), + _checks(0u), + _limit(limit) + { + } + ~MyFlushToken() override = default; + bool stop_requested() const noexcept override; + size_t get_checks() const noexcept { return _checks; } +}; + +bool +MyFlushToken::stop_requested() const noexcept +{ + if (++_checks >= _limit) { + const_cast<MyFlushToken *>(this)->request_stop(); + } + return FlushToken::stop_requested(); +} + +} + +TEST_F(FusionTest, require_that_fusion_can_be_stopped) +{ + clean_stopped_fusion_testdirs(); + auto flush_token = std::make_shared<MyFlushToken>(10000); + make_simple_index("stopdump2", MockFieldLengthInspector()); + ASSERT_TRUE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); + EXPECT_EQ(40, flush_token->get_checks()); + vespalib::rmdir("stopdump3", true); + flush_token = std::make_shared<MyFlushToken>(1); + ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); + EXPECT_EQ(12, flush_token->get_checks()); + vespalib::rmdir("stopdump3", true); + flush_token = std::make_shared<MyFlushToken>(39); + ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token)); + EXPECT_EQ(41, flush_token->get_checks()); + clean_stopped_fusion_testdirs(); +} + } } |