aboutsummaryrefslogtreecommitdiffstats
path: root/searchlib/src/tests/transactionlog/translogclient_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'searchlib/src/tests/transactionlog/translogclient_test.cpp')
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp100
1 files changed, 54 insertions, 46 deletions
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index bf3a269107c..a922795d570 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -61,21 +61,42 @@ createDomainConfig(uint32_t partSizeLimit) {
.setEncoding(Encoding(Encoding::xxh64, Encoding::none_multi));
}
+// Used to signal 'eof' when visiting in a way that makes sure the
+// stuff we have visited is visible to the test code.
+class Eof {
+private:
+ std::atomic<bool> _eof;
+public:
+ Eof() : _eof(false) {}
+ void set() {
+ _eof.store(true, std::memory_order_release);
+ }
+ bool wait() const {
+ for (size_t i = 0; !_eof.load(std::memory_order_relaxed) && (i < 120000); i++) {
+ std::this_thread::sleep_for(1ms);
+ }
+ return _eof.load(std::memory_order_acquire);
+ }
+ void clear() {
+ _eof = false;
+ }
+};
+
class CallBackTest : public Callback
{
private:
RPC::Result receive(const Packet & packet) override;
- void eof() override { _eof = true; }
+ void eof() override { _eof.set(); }
typedef std::map<SerialNum, std::unique_ptr<ByteBuffer>> PacketMap;
PacketMap _packetMap;
+ Eof _eof;
public:
- CallBackTest() : _eof(false) { }
+ CallBackTest() : _packetMap(), _eof() { }
size_t size() const { return _packetMap.size(); }
bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); }
- void clear() { _eof = false; _packetMap.clear(); }
+ void clear() { _eof.clear(); _packetMap.clear(); }
const ByteBuffer & packet(SerialNum n) { return *(_packetMap.find(n)->second); }
-
- bool _eof;
+ bool wait_for_eof() const { return _eof.wait(); }
};
RPC::Result
@@ -96,11 +117,12 @@ class CallBackManyTest : public Callback
{
private:
RPC::Result receive(const Packet & packet) override;
- void eof() override { _eof = true; }
+ void eof() override { _eof.set(); }
+ Eof _eof;
public:
- explicit CallBackManyTest(size_t start) : _eof(false), _count(start), _value(start) { }
- void clear() { _eof = false; _count = 0; _value = 0; }
- bool _eof;
+ explicit CallBackManyTest(size_t start) : _eof(), _count(start), _value(start) { }
+ void clear() { _eof.clear(); _count = 0; _value = 0; }
+ bool wait_for_eof() const { return _eof.wait(); }
size_t _count;
size_t _value;
};
@@ -127,10 +149,11 @@ public:
typedef std::map<SerialNum, Identifiable *> PacketMap;
private:
RPC::Result receive(const Packet & packet) override;
- void eof() override { _eof = true; }
+ void eof() override { _eof.set(); }
PacketMap _packetMap;
+ Eof _eof;
public:
- CallBackUpdate() : _eof(false) { }
+ CallBackUpdate() : _packetMap(), _eof() { }
~CallBackUpdate() override {
while (_packetMap.begin() != _packetMap.end()) {
delete _packetMap.begin()->second;
@@ -139,7 +162,7 @@ public:
}
bool hasSerial(SerialNum n) const { return (_packetMap.find(n) != _packetMap.end()); }
const PacketMap & map() const { return _packetMap; }
- bool _eof;
+ bool wait_for_eof() const { return _eof.wait(); }
};
@@ -180,15 +203,16 @@ class CallBackStatsTest : public Callback
{
private:
RPC::Result receive(const Packet & packet) override;
- void eof() override { _eof = true; }
+ void eof() override { _eof.set(); }
+ Eof _eof;
public:
- CallBackStatsTest() : _eof(false),
+ CallBackStatsTest() : _eof(),
_count(0), _inOrder(0),
_firstSerial(0), _lastSerial(0),
_prevSerial(0) { }
- void clear() { _eof = false; _count = 0; _inOrder = 0;
+ void clear() { _eof.clear(); _count = 0; _inOrder = 0;
_firstSerial = 0; _lastSerial = 0; _inOrder = 0; }
- bool _eof;
+ bool wait_for_eof() const { return _eof.wait(); }
uint64_t _count;
uint64_t _inOrder; // increase when next entry is one above previous
SerialNum _firstSerial;
@@ -409,8 +433,7 @@ visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & nam
auto visitor = tls.createVisitor(name, ca);
ASSERT_TRUE(visitor);
EXPECT_TRUE( visitor->visit(0, 1) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- EXPECT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ca.hasSerial(1) );
EXPECT_TRUE( ! ca.hasSerial(2) );
@@ -419,8 +442,7 @@ visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & nam
visitor = tls.createVisitor(name, ca);
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(1, 2) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- EXPECT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ! ca.hasSerial(1) );
EXPECT_TRUE( ca.hasSerial(2) );
@@ -430,8 +452,7 @@ visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & nam
visitor = tls.createVisitor(name, ca);
EXPECT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(0, 3) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- EXPECT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( ca.hasSerial(1) );
EXPECT_TRUE( ca.hasSerial(2) );
@@ -441,8 +462,7 @@ visitDomainTest(TransLogClient & tls, Session * s1, const vespalib::string & nam
visitor = tls.createVisitor(name, ca);
ASSERT_TRUE(visitor.get());
EXPECT_TRUE( visitor->visit(2, 3) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- EXPECT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_TRUE( ! ca.hasSerial(0) );
EXPECT_TRUE( !ca.hasSerial(1) );
EXPECT_TRUE( !ca.hasSerial(2) );
@@ -554,8 +574,7 @@ partialUpdateTest(const vespalib::string & testDir) {
auto visitor = tls.createVisitor("test1", ca);
ASSERT_TRUE(visitor);
ASSERT_TRUE( visitor->visit(5, 7) );
- for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
ASSERT_EQUAL(1u, ca.map().size());
ASSERT_TRUE( ca.hasSerial(7) );
@@ -563,24 +582,21 @@ partialUpdateTest(const vespalib::string & testDir) {
auto visitor1 = tls.createVisitor("test1", ca1);
ASSERT_TRUE(visitor1);
ASSERT_TRUE( visitor1->visit(4, 5) );
- for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca1._eof );
+ ASSERT_TRUE( ca1.wait_for_eof() );
ASSERT_TRUE( ca1.map().empty());
CallBackUpdate ca2;
auto visitor2 = tls.createVisitor("test1", ca2);
ASSERT_TRUE(visitor2);
ASSERT_TRUE( visitor2->visit(5, 6) );
- for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca2._eof );
+ ASSERT_TRUE( ca2.wait_for_eof() );
ASSERT_TRUE( ca2.map().empty());
CallBackUpdate ca3;
auto visitor3 = tls.createVisitor("test1", ca3);
ASSERT_TRUE(visitor3);
ASSERT_TRUE( visitor3->visit(5, 1000) );
- for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca3._eof );
+ ASSERT_TRUE( ca3.wait_for_eof() );
ASSERT_TRUE( ca3.map().size() == 1);
ASSERT_TRUE( ca3.hasSerial(7) );
}
@@ -634,10 +650,7 @@ assertVisitStats(TransLogClient &tls, const vespalib::string &domain,
auto visitor = tls.createVisitor(domain, ca);
ASSERT_TRUE(visitor);
ASSERT_TRUE( visitor->visit(visitStart, visitEnd) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) {
- std::this_thread::sleep_for(10ms);
- }
- ASSERT_TRUE(ca._eof);
+ ASSERT_TRUE(ca.wait_for_eof());
EXPECT_EQUAL(expFirstSerial, ca._firstSerial);
EXPECT_EQUAL(expLastSerial, ca._lastSerial);
EXPECT_EQUAL(expCount, ca._count);
@@ -682,8 +695,7 @@ testSendingAlotOfDataSync(const vespalib::string & testDir) {
auto visitor = tls.createVisitor("many", ca);
ASSERT_TRUE(visitor);
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
}
@@ -703,8 +715,7 @@ testSendingAlotOfDataSync(const vespalib::string & testDir) {
auto visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor);
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
}
@@ -724,8 +735,7 @@ testSendingAlotOfDataSync(const vespalib::string & testDir) {
auto visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor);
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
}
@@ -753,8 +763,7 @@ void testSendingAlotOfDataAsync(const vespalib::string & testDir) {
auto visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor);
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
}
@@ -774,8 +783,7 @@ void testSendingAlotOfDataAsync(const vespalib::string & testDir) {
auto visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor);
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
- for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.wait_for_eof() );
EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
}