summaryrefslogtreecommitdiffstats
path: root/searchlib/src/tests
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2021-01-05 12:51:15 +0100
committerTor Egge <Tor.Egge@broadpark.no>2021-01-05 14:58:59 +0100
commit42a02eddf6b450682994afae68249171fe7876b8 (patch)
tree6a0a14eac83492008788f41baf81c6dcc87d0a5d /searchlib/src/tests
parent6382cb8513ab166e4e4184e0ddebd60f97fb6bb3 (diff)
Add low level support for stopping a running disk index fusion.
Diffstat (limited to 'searchlib/src/tests')
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp81
1 files changed, 71 insertions, 10 deletions
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index c94d19cb3b7..efc9e99bf88 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchlib/common/flush_token.h>
#include <vespa/searchlib/diskindex/diskindex.h>
#include <vespa/searchlib/diskindex/fusion.h>
#include <vespa/searchlib/diskindex/indexbuilder.h>
@@ -61,6 +62,7 @@ protected:
void requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap);
void make_simple_index(const vespalib::string &dump_dir, const IFieldLengthInspector &field_length_inspector);
+ bool try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token);
void merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources);
public:
FusionTest();
@@ -390,7 +392,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump2");
ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing,fileHeaderContext, executor));
+ tuneFileIndexing,fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw3(prefix + "dump3");
@@ -403,7 +405,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump3");
ASSERT_TRUE(Fusion::merge(schema2, prefix + "dump4", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor));
+ tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw4(prefix + "dump4");
@@ -416,7 +418,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump3");
ASSERT_TRUE(Fusion::merge(schema3, prefix + "dump5", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor));
+ tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw5(prefix + "dump5");
@@ -429,7 +431,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump3");
ASSERT_TRUE(Fusion::merge(schema, prefix + "dump6", sources, selector,
!dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor));
+ tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw6(prefix + "dump6");
@@ -442,7 +444,7 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
sources.push_back(prefix + "dump2");
ASSERT_TRUE(Fusion::merge(schema, prefix + "dump3", sources, selector,
dynamicKPosOcc,
- tuneFileIndexing, fileHeaderContext, executor));
+ tuneFileIndexing, fileHeaderContext, executor, std::make_shared<FlushToken>()));
} while (0);
do {
DiskIndex dw3(prefix + "dump3");
@@ -476,16 +478,22 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng
ib.close();
}
-void
-FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources)
+bool
+FusionTest::try_merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources, std::shared_ptr<IFlushToken> flush_token)
{
vespalib::ThreadStackExecutor executor(4, 0x10000);
TuneFileIndexing tuneFileIndexing;
DummyFileHeaderContext fileHeaderContext;
SelectorArray selector(20, 0);
- ASSERT_TRUE(Fusion::merge(_schema, dump_dir, sources, selector,
- false,
- tuneFileIndexing, fileHeaderContext, executor));
+ return Fusion::merge(_schema, dump_dir, sources, selector,
+ false,
+ tuneFileIndexing, fileHeaderContext, executor, flush_token);
+}
+
+void
+FusionTest::merge_simple_indexes(const vespalib::string &dump_dir, const std::vector<vespalib::string> &sources)
+{
+ ASSERT_TRUE(try_merge_simple_indexes(dump_dir, sources, std::make_shared<FlushToken>()));
}
FusionTest::FusionTest()
@@ -553,6 +561,59 @@ TEST_F(FusionTest, require_that_interleaved_features_can_be_reconstructed)
clean_field_length_testdirs();
}
+namespace {
+
+void clean_stopped_fusion_testdirs()
+{
+ vespalib::rmdir("stopdump2", true);
+ vespalib::rmdir("stopdump3", true);
+}
+
+class MyFlushToken : public FlushToken
+{
+ mutable std::atomic<size_t> _checks;
+ const size_t _limit;
+public:
+ MyFlushToken(size_t limit)
+ : FlushToken(),
+ _checks(0u),
+ _limit(limit)
+ {
+ }
+ ~MyFlushToken() override = default;
+ bool stop_requested() const noexcept override;
+ size_t get_checks() const noexcept { return _checks; }
+};
+
+bool
+MyFlushToken::stop_requested() const noexcept
+{
+ if (++_checks >= _limit) {
+ const_cast<MyFlushToken *>(this)->request_stop();
+ }
+ return FlushToken::stop_requested();
+}
+
+}
+
+TEST_F(FusionTest, require_that_fusion_can_be_stopped)
+{
+ clean_stopped_fusion_testdirs();
+ auto flush_token = std::make_shared<MyFlushToken>(10000);
+ make_simple_index("stopdump2", MockFieldLengthInspector());
+ ASSERT_TRUE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
+ EXPECT_EQ(40, flush_token->get_checks());
+ vespalib::rmdir("stopdump3", true);
+ flush_token = std::make_shared<MyFlushToken>(1);
+ ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
+ EXPECT_EQ(12, flush_token->get_checks());
+ vespalib::rmdir("stopdump3", true);
+ flush_token = std::make_shared<MyFlushToken>(39);
+ ASSERT_FALSE(try_merge_simple_indexes("stopdump3", {"stopdump2"}, flush_token));
+ EXPECT_EQ(41, flush_token->get_checks());
+ clean_stopped_fusion_testdirs();
+}
+
}
}