summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-31 21:33:58 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-01-31 21:33:58 +0000
commita344dd17d18cf12805b977986f0bf36d0191bfe8 (patch)
tree4dacb9ca69e0aeecd3f0e116761acfc3f40e5fc1
parentb39f3bd47b9b915a061bc28c3203f5eca5c541ca (diff)
Add test that verifies that concurrency is upheld during triggerFlush too.
-rw-r--r--searchcore/src/tests/proton/flushengine/flushengine_test.cpp106
1 files changed, 51 insertions, 55 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
index b1c188b2f9f..d5823a8e055 100644
--- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
+++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp
@@ -42,8 +42,7 @@ public:
public:
SimpleExecutor()
: _done()
- {
- }
+ { }
Task::UP
execute(Task::UP task) override
@@ -83,8 +82,7 @@ public:
SimpleHandler &handler)
: _task(std::move(task)),
_handler(handler)
- {
- }
+ { }
search::SerialNum getFlushSerial() const override {
return _task->getFlushSerial();
@@ -95,19 +93,15 @@ class WrappedFlushTarget : public FlushTargetProxy
{
SimpleHandler &_handler;
public:
- WrappedFlushTarget(const IFlushTarget::SP &target,
- SimpleHandler &handler)
+ WrappedFlushTarget(const IFlushTarget::SP &target, SimpleHandler &handler)
: FlushTargetProxy(target),
_handler(handler)
- {
- }
+ { }
- Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override
- {
+ Task::UP initFlush(SerialNum currentSerial, std::shared_ptr<search::IFlushToken> flush_token) override {
Task::UP task(_target->initFlush(currentSerial, std::move(flush_token)));
if (task) {
- return std::make_unique<WrappedFlushTask>(std::move(task),
- _handler);
+ return std::make_unique<WrappedFlushTask>(std::move(task), _handler);
}
return task;
}
@@ -140,33 +134,25 @@ public:
_lock(),
_done(targets.size()),
_flushDoneHistory()
- {
- }
+ { }
- search::SerialNum
- getCurrentSerialNumber() const override
- {
- LOG(info, "SimpleHandler(%s)::getCurrentSerialNumber()",
- getName().c_str());
+ search::SerialNum getCurrentSerialNumber() const override {
+ LOG(info, "SimpleHandler(%s)::getCurrentSerialNumber()", getName().c_str());
return _currentSerial;
}
std::vector<IFlushTarget::SP>
- getFlushTargets() override
- {
- LOG(info, "SimpleHandler(%s)::getFlushTargets()",
- getName().c_str());
+ getFlushTargets() override {
+ LOG(info, "SimpleHandler(%s)::getFlushTargets()", getName().c_str());
std::vector<IFlushTarget::SP> wrappedTargets;
for (const auto &target : _targets) {
- wrappedTargets.push_back(std::make_shared<WrappedFlushTarget>
- (target, *this));
+ wrappedTargets.push_back(std::make_shared<WrappedFlushTarget>(target, *this));
}
return wrappedTargets;
}
// Called once by flush engine thread for each task done
- void taskDone()
- {
+ void taskDone() {
std::lock_guard<std::mutex> guard(_lock);
++_pendingDone;
}
@@ -174,12 +160,9 @@ public:
// Called by flush engine master thread after flush handler is
// added to flush engine and when one or more flush tasks related
// to flush handler have completed.
- void
- flushDone(search::SerialNum oldestSerial) override
- {
+ void flushDone(search::SerialNum oldestSerial) override {
std::lock_guard<std::mutex> guard(_lock);
- LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")",
- getName().c_str(), oldestSerial);
+ LOG(info, "SimpleHandler(%s)::flushDone(%" PRIu64 ")", getName().c_str(), oldestSerial);
_oldestSerial = std::max(_oldestSerial, oldestSerial);
_flushDoneHistory.push_back(oldestSerial);
while (_pendingDone > 0) {
@@ -188,8 +171,7 @@ public:
}
}
- FlushDoneHistory getFlushDoneHistory()
- {
+ FlushDoneHistory getFlushDoneHistory() {
std::lock_guard<std::mutex> guard(_lock);
return _flushDoneHistory;
}
@@ -217,12 +199,11 @@ public:
search::SerialNum &currentSerial)
: _flushedSerial(flushedSerial), _currentSerial(currentSerial),
_start(start), _done(done), _proceed(proceed)
- {
- }
+ { }
void run() override {
_start.countDown();
- if (_proceed != NULL) {
+ if (_proceed != nullptr) {
_proceed->await();
}
_flushedSerial = _currentSerial;
@@ -270,8 +251,7 @@ public:
_taskStart(),
_taskDone(),
_task(std::move(task))
- {
- }
+ { }
SimpleTarget(search::SerialNum flushedSerial = 0, bool proceedImmediately = true)
: SimpleTarget("anon", flushedSerial, proceedImmediately)
@@ -316,8 +296,7 @@ public:
: SimpleTarget("anon"),
_mgain(false),
_serial(false)
- {
- }
+ { }
MemoryGain getApproxMemoryGain() const override {
LOG_ASSERT(_mgain == false);
@@ -389,8 +368,7 @@ public:
class NoFlushStrategy : public SimpleStrategy
{
- FlushContext::List getFlushTargets(const FlushContext::List &,
- const flushengine::TlsStatsMap &) const override {
+ FlushContext::List getFlushTargets(const FlushContext::List &, const flushengine::TlsStatsMap &) const override {
return FlushContext::List();
}
};
@@ -430,13 +408,11 @@ struct Fixture
: tlsStatsFactory(std::make_shared<SimpleTlsStatsFactory>()),
strategy(strategy_),
engine(tlsStatsFactory, strategy, numThreads, idleInterval)
- {
- }
+ { }
Fixture(uint32_t numThreads, vespalib::duration idleInterval)
: Fixture(numThreads, idleInterval, std::make_shared<SimpleStrategy>())
- {
- }
+ { }
void putFlushHandler(const vespalib::string &docTypeName, IFlushHandler::SP handler) {
engine.putFlushHandler(DocTypeName(docTypeName), handler);
@@ -446,17 +422,14 @@ struct Fixture
strategy->_targets.push_back(std::move(target));
}
- std::shared_ptr<SimpleHandler>
- addSimpleHandler(Targets targets)
- {
+ std::shared_ptr<SimpleHandler> addSimpleHandler(Targets targets) {
auto handler = std::make_shared<SimpleHandler>(targets, "handler", 20);
engine.putFlushHandler(DocTypeName("handler"), handler);
engine.start();
return handler;
}
- void assertOldestSerial(SimpleHandler &handler, search::SerialNum expOldestSerial)
- {
+ void assertOldestSerial(SimpleHandler &handler, search::SerialNum expOldestSerial) {
using namespace std::chrono_literals;
for (int pass = 0; pass < 600; ++pass) {
std::this_thread::sleep_for(100ms);
@@ -593,8 +566,7 @@ TEST_F("require that target can refuse flush", Fixture(2, IINTERVAL))
EXPECT_TRUE(!handler->_done.await(SHORT_TIMEOUT));
}
-TEST_F("require that targets are flushed when nothing new to flush",
- Fixture(2, IINTERVAL))
+TEST_F("require that targets are flushed when nothing new to flush", Fixture(2, IINTERVAL))
{
auto target = std::make_shared<SimpleTarget>("anon", 5); // oldest unflushed serial num = 5
auto handler = std::make_shared<SimpleHandler>(Targets({target}), "anon", 4); // current serial num = 4
@@ -640,7 +612,7 @@ TEST("require that threaded target works")
auto target = std::make_shared<ThreadedFlushTarget>(executor, getSerialNum, std::make_shared<SimpleTarget>());
EXPECT_FALSE(executor._done.await(SHORT_TIMEOUT));
- EXPECT_TRUE(target->initFlush(0, std::make_shared<search::FlushToken>()).get() != NULL);
+ EXPECT_TRUE(target->initFlush(0, std::make_shared<search::FlushToken>()));
EXPECT_TRUE(executor._done.await(LONG_TIMEOUT));
}
@@ -713,6 +685,30 @@ TEST_F("require that concurrency works", Fixture(2, 1ms))
target2->_proceed.countDown();
}
+TEST_F("require that concurrency works with triggerFlush", Fixture(2, 1ms))
+{
+ auto target1 = std::make_shared<SimpleTarget>("target1", 1, false);
+ auto target2 = std::make_shared<SimpleTarget>("target2", 2, false);
+ auto target3 = std::make_shared<SimpleTarget>("target3", 3, false);
+ auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3}), "handler", 9);
+ f.putFlushHandler("handler", handler);
+ std::thread thread([this]() { f.engine.triggerFlush(); });
+ std::this_thread::sleep_for(1s);
+ f.engine.start();
+
+ EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT));
+ EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT));
+ EXPECT_TRUE(!target3->_initDone.await(SHORT_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2"});
+ EXPECT_TRUE(!target3->_initDone.await(SHORT_TIMEOUT));
+ target1->_proceed.countDown();
+ EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT));
+ assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target3"});
+ target3->_proceed.countDown();
+ target2->_proceed.countDown();
+ thread.join();
+}
+
TEST_F("require that state explorer can list flush targets", Fixture(1, 1ms))
{
auto target = std::make_shared<SimpleTarget>("target1", 100, false);