aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests')
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp4
-rw-r--r--storage/src/tests/distributor/removebucketoperationtest.cpp12
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp12
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp87
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h28
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp10
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp80
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp5
8 files changed, 145 insertions, 93 deletions
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index d521975e0cb..a1263c9433b 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -220,8 +220,8 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket)
pending_message_tracker().insert(msg);
}
{
- RemoveBucketOperation op(dummy_cluster_context,
- BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
+ // TODO we might not want this particular behavior for merge operations either
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(2, 3)));
// Not blocked for exact node match.
EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq));
// But blocked for bucket match!
diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp
index e877f4601b7..971ff36c833 100644
--- a/storage/src/tests/distributor/removebucketoperationtest.cpp
+++ b/storage/src/tests/distributor/removebucketoperationtest.cpp
@@ -119,4 +119,16 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) {
EXPECT_EQ("NONEXISTING", dumpBucket(document::BucketId(16, 1)));
}
+TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_target_node) {
+ RemoveBucketOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ toVector<uint16_t>(1, 3)));
+ // In node target set
+ EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 1, 120));
+ EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 3, 120));
+ // Not in node target set
+ EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 0, 120));
+ EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index 25009156f18..f5531a134d0 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -38,14 +38,15 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
struct PendingMessage
{
uint32_t _msgType;
+ uint16_t _node;
uint8_t _pri;
- PendingMessage() : _msgType(UINT32_MAX), _pri(0) {}
+ constexpr PendingMessage() noexcept : _msgType(UINT32_MAX), _node(0), _pri(0) {}
- PendingMessage(uint32_t msgType, uint8_t pri)
- : _msgType(msgType), _pri(pri) {}
+ constexpr PendingMessage(uint32_t msgType, uint8_t pri) noexcept
+ : _msgType(msgType), _node(0), _pri(pri) {}
- bool shouldCheck() const { return _msgType != UINT32_MAX; }
+ bool shouldCheck() const noexcept { return _msgType != UINT32_MAX; }
};
void enableClusterState(const lib::ClusterState& systemState) {
@@ -97,8 +98,7 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
IdealStateOperation::UP op(result.createOperation());
if (op.get()) {
if (blocker.shouldCheck()
- && op->shouldBlockThisOperation(blocker._msgType,
- blocker._pri))
+ && op->shouldBlockThisOperation(blocker._msgType, blocker._node, blocker._pri))
{
return "BLOCKED";
}
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index a174d305c27..b3bd1c6a253 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -22,6 +22,15 @@
} \
}
+#define CHECK_ERROR_ASYNC(className, failType, onError) \
+ { \
+ Guard guard(_lock); \
+ if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \
+ onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \
+ return; \
+ } \
+ }
+
namespace storage {
namespace {
@@ -87,46 +96,40 @@ PersistenceProviderWrapper::getBucketInfo(const spi::Bucket& bucket) const
return _spi.getBucketInfo(bucket);
}
-spi::Result
-PersistenceProviderWrapper::put(const spi::Bucket& bucket, spi::Timestamp timestamp,
- document::Document::SP doc, spi::Context& context)
+void
+PersistenceProviderWrapper::putAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, document::Document::SP doc,
+ spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("put(" << bucket << ", " << timestamp << ", " << doc->getId() << ")");
- CHECK_ERROR(spi::Result, FAIL_PUT);
- return _spi.put(bucket, timestamp, std::move(doc), context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_PUT, onComplete);
+ _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(onComplete));
}
-spi::RemoveResult
-PersistenceProviderWrapper::remove(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- const spi::DocumentId& id,
- spi::Context& context)
+void
+PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id,
+ spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("remove(" << bucket << ", " << timestamp << ", " << id << ")");
- CHECK_ERROR(spi::RemoveResult, FAIL_REMOVE);
- return _spi.remove(bucket, timestamp, id, context);
+ CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE, onComplete);
+ _spi.removeAsync(bucket, timestamp, id, context, std::move(onComplete));
}
-spi::RemoveResult
-PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- const spi::DocumentId& id,
- spi::Context& context)
+void
+PersistenceProviderWrapper::removeIfFoundAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id,
+ spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("removeIfFound(" << bucket << ", " << timestamp << ", " << id << ")");
- CHECK_ERROR(spi::RemoveResult, FAIL_REMOVE_IF_FOUND);
- return _spi.removeIfFound(bucket, timestamp, id, context);
+ CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE_IF_FOUND, onComplete);
+ _spi.removeIfFoundAsync(bucket, timestamp, id, context, std::move(onComplete));
}
-spi::UpdateResult
-PersistenceProviderWrapper::update(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- document::DocumentUpdate::SP upd,
- spi::Context& context)
+void
+PersistenceProviderWrapper::updateAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, document::DocumentUpdate::SP upd,
+ spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")");
- CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE);
- return _spi.update(bucket, timestamp, std::move(upd), context);
+ CHECK_ERROR_ASYNC(spi::UpdateResult, FAIL_UPDATE, onComplete);
+ _spi.updateAsync(bucket, timestamp, std::move(upd), context, std::move(onComplete));
}
spi::GetResult
@@ -172,13 +175,13 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId,
return _spi.destroyIterator(iterId, context);
}
-spi::Result
-PersistenceProviderWrapper::deleteBucket(const spi::Bucket& bucket,
- spi::Context& context)
+void
+PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context,
+ spi::OperationComplete::UP operationComplete)
{
LOG_SPI("deleteBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_DELETE_BUCKET);
- return _spi.deleteBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete);
+ _spi.deleteBucketAsync(bucket, context, std::move(operationComplete));
}
spi::Result
@@ -225,4 +228,26 @@ PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket,
return _spi.removeEntry(bucket, timestamp, context);
}
+spi::Result
+PersistenceProviderWrapper::initialize() {
+ return _spi.initialize();
+}
+
+spi::BucketIdListResult
+PersistenceProviderWrapper::getModifiedBuckets(spi::PersistenceProvider::BucketSpace bucketSpace) const {
+ return _spi.getModifiedBuckets(bucketSpace);
+}
+
+spi::Result
+PersistenceProviderWrapper::setClusterState(spi::PersistenceProvider::BucketSpace bucketSpace, const spi::ClusterState &state) {
+ return _spi.setClusterState(bucketSpace, state);
+}
+
+void
+PersistenceProviderWrapper::setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
+ spi::OperationComplete::UP onComplete)
+{
+ _spi.setActiveStateAsync(bucket, state, std::move(onComplete));
+}
+
}
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index d90fa7b2eaa..c6628814dba 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -21,7 +21,7 @@
namespace storage {
-class PersistenceProviderWrapper : public spi::AbstractPersistenceProvider
+class PersistenceProviderWrapper : public spi::PersistenceProvider
{
public:
enum OPERATION_FAILURE_FLAGS
@@ -47,11 +47,11 @@ public:
// TODO: add more as needed
};
private:
- spi::PersistenceProvider& _spi;
- spi::Result _result;
- mutable std::mutex _lock;
+ spi::PersistenceProvider& _spi;
+ spi::Result _result;
+ mutable std::mutex _lock;
mutable std::vector<std::string> _log;
- uint32_t _failureMask;
+ uint32_t _failureMask;
using Guard = std::lock_guard<std::mutex>;
public:
PersistenceProviderWrapper(spi::PersistenceProvider& spi);
@@ -88,13 +88,21 @@ public:
_log.clear();
}
+ spi::Result initialize() override;
+ spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
+
+ spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState &state) override;
+
+ void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
+ spi::OperationComplete::UP up) override;
+
spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
- spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
- spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
- spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
+ void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
+ void removeAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&, spi::OperationComplete::UP) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult
@@ -103,7 +111,7 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1,
const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index c51357cacd1..07d2b24d536 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -46,14 +46,15 @@ public:
_deleteBucketInvocations(0)
{}
- spi::Result put(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&) override
+ void
+ putAsync(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&, spi::OperationComplete::UP onComplete) override
{
_queueBarrier.await();
// message abort stage with active opertion in disk queue
std::this_thread::sleep_for(75ms);
_completionBarrier.await();
// test finished
- return spi::Result();
+ onComplete->onComplete(std::make_unique<spi::Result>());
}
spi::BucketInfoResult getBucketInfo(const spi::Bucket& bucket) const override {
@@ -66,9 +67,10 @@ public:
return PersistenceProviderWrapper::createBucket(bucket, ctx);
}
- spi::Result deleteBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void
+ deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
++_deleteBucketInvocations;
- return PersistenceProviderWrapper::deleteBucket(bucket, ctx);
+ PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete));
}
};
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index fa989410137..60030004594 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -304,10 +304,10 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_FALSE(replySent.get());
LOG(debug, "Verifying that replying the diff sends on back");
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd2);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd2);
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*reply, stub);
+ handler.handleApplyBucketDiffReply(*reply, stub, createTracker(reply, _bucket));
ASSERT_EQ(1, stub.replies.size());
replySent = stub.replies[0];
}
@@ -353,12 +353,12 @@ TEST_F(MergeHandlerTest, master_message_flow) {
ASSERT_EQ(2, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType());
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
- auto reply2 = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto reply2 = std::make_shared<api::ApplyBucketDiffReply>(cmd3);
ASSERT_EQ(1, reply2->getDiff().size());
reply2->getDiff()[0]._entry._hasMask |= 2u;
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*reply2, stub);
+ handler.handleApplyBucketDiffReply(*reply2, stub, createTracker(reply2, _bucket));
ASSERT_EQ(1, stub.replies.size());
@@ -470,9 +470,9 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
}
}
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
{
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper(), createTracker(applyBucketDiffReply, _bucket));
if (!messageKeeper()._msgs.empty()) {
ASSERT_FALSE(reply.get());
@@ -672,10 +672,10 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper());
auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub);
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket));
ASSERT_EQ(1, stub.replies.size());
@@ -699,14 +699,14 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper());
auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
ASSERT_FALSE(applyBucketDiffReply->getDiff().empty());
// Change a hasMask to indicate something changed during merging.
applyBucketDiffReply->getDiff()[0]._entry._hasMask = 0x5;
MessageSenderStub stub;
LOG(debug, "sending apply bucket diff reply");
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub);
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket));
ASSERT_EQ(1, stub.commands.size());
@@ -1012,10 +1012,10 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
spi::Context&)
{
(void) test;
- api::ApplyBucketDiffReply reply(*_applyCmd);
- test.fillDummyApplyDiff(reply.getDiff());
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(*_applyCmd);
+ test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
- handler.handleApplyBucketDiffReply(reply, _stub);
+ handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
}
std::string
@@ -1275,8 +1275,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(1, cmd2.getAddress()->getIndex());
EXPECT_EQ(1234, cmd2.getSourceIndex());
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList);
baseline_diff_size = cmd2.getDiff().size();
auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2);
auto &diff = reply->getDiff();
@@ -1292,16 +1292,16 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking first ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// Node 4 has been eliminated before the first ApplyBucketDiff command
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 2u, s.diff.size());
- EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]);
- EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 2u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20000, 24u), s->diff[baseline_diff_size]);
+ EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size + 1]);
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
// ApplyBucketDiffCommand has a shorter node list, node 2 is not present
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd3);
auto& diff = reply->getDiff();
EXPECT_EQ(2u, diff.size());
EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry);
@@ -1312,7 +1312,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
*/
fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo());
diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1.
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled first ApplyBucketDiffReply");
}
ASSERT_EQ(3u, messageKeeper()._msgs.size());
@@ -1320,19 +1320,19 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking second ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size]);
auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry);
// Simulate that node 3 somehow lost doc2 when trying to fill diff entry.
diff[0]._entry._hasMask &= ~4u;
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled second ApplyBucketDiffReply");
}
ASSERT_EQ(4u, messageKeeper()._msgs.size());
@@ -1340,21 +1340,21 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking third ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command
- EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 16u), s->diff[baseline_diff_size]);
auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]);
EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5);
auto& diff = reply->getDiff();
EXPECT_EQ(baseline_diff_size, diff.size());
for (auto& e : diff) {
EXPECT_EQ(1u, e._entry._hasMask);
e._entry._hasMask |= 2u;
}
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled third ApplyBucketDiffReply");
}
ASSERT_EQ(5u, messageKeeper()._msgs.size());
@@ -1362,19 +1362,19 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking fourth ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// All nodes in use again due to failure to fill diff entry for doc2
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
- EXPECT_EQ(1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList);
+ EXPECT_EQ(1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 16u), s->diff[0]);
auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo());
diff[0]._entry._hasMask |= 2u;
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
ASSERT_EQ(6u, messageKeeper()._msgs.size());
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 568eee1e92c..5c192942521 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1151,6 +1151,10 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
size_t maxQueue = _throttlers[0]->getMaxQueueSize();
ASSERT_EQ(20, maxQueue);
ASSERT_LT(maxPending, 100);
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().active_window_size.getLast(), 0);
+ EXPECT_EQ(_throttlers[0]->getMetrics().queueSize.getLast(), 0);
+
for (size_t i = 0; i < maxPending + maxQueue; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
// No chain set, i.e. merge command is freshly squeezed from a distributor.
@@ -1162,6 +1166,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
// Wait till we have maxPending replies and maxQueue queued
_topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
waitUntilMergeQueueIs(*_throttlers[0], maxQueue, _messageWaitTime);
+ EXPECT_EQ(_throttlers[0]->getMetrics().active_window_size.getLast(), maxPending);
EXPECT_EQ(maxQueue, _throttlers[0]->getMetrics().queueSize.getMaximum());
// Clear all forwarded merges