diff options
Diffstat (limited to 'storage/src/tests')
8 files changed, 145 insertions, 93 deletions
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp index d521975e0cb..a1263c9433b 100644 --- a/storage/src/tests/distributor/idealstatemanagertest.cpp +++ b/storage/src/tests/distributor/idealstatemanagertest.cpp @@ -220,8 +220,8 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket) pending_message_tracker().insert(msg); } { - RemoveBucketOperation op(dummy_cluster_context, - BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7))); + // TODO we might not want this particular behavior for merge operations either + MergeOperation op(BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(2, 3))); // Not blocked for exact node match. EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq)); // But blocked for bucket match! diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp index e877f4601b7..971ff36c833 100644 --- a/storage/src/tests/distributor/removebucketoperationtest.cpp +++ b/storage/src/tests/distributor/removebucketoperationtest.cpp @@ -119,4 +119,16 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) { EXPECT_EQ("NONEXISTING", dumpBucket(document::BucketId(16, 1))); } +TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_target_node) { + RemoveBucketOperation op(dummy_cluster_context, + BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + toVector<uint16_t>(1, 3))); + // In node target set + EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 1, 120)); + EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 3, 120)); + // Not in node target set + EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 0, 120)); + EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120)); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp index 25009156f18..f5531a134d0 100644 --- a/storage/src/tests/distributor/statecheckerstest.cpp +++ b/storage/src/tests/distributor/statecheckerstest.cpp @@ -38,14 +38,15 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { struct PendingMessage { uint32_t _msgType; + uint16_t _node; uint8_t _pri; - PendingMessage() : _msgType(UINT32_MAX), _pri(0) {} + constexpr PendingMessage() noexcept : _msgType(UINT32_MAX), _node(0), _pri(0) {} - PendingMessage(uint32_t msgType, uint8_t pri) - : _msgType(msgType), _pri(pri) {} + constexpr PendingMessage(uint32_t msgType, uint8_t pri) noexcept + : _msgType(msgType), _node(0), _pri(pri) {} - bool shouldCheck() const { return _msgType != UINT32_MAX; } + bool shouldCheck() const noexcept { return _msgType != UINT32_MAX; } }; void enableClusterState(const lib::ClusterState& systemState) { @@ -97,8 +98,7 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil { IdealStateOperation::UP op(result.createOperation()); if (op.get()) { if (blocker.shouldCheck() - && op->shouldBlockThisOperation(blocker._msgType, - blocker._pri)) + && op->shouldBlockThisOperation(blocker._msgType, blocker._node, blocker._pri)) { return "BLOCKED"; } diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index a174d305c27..b3bd1c6a253 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -22,6 +22,15 @@ } \ } +#define CHECK_ERROR_ASYNC(className, failType, onError) \ + { \ + Guard guard(_lock); \ + if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \ + onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \ + return; \ + } \ + } + namespace storage { namespace { @@ -87,46 +96,40 @@ PersistenceProviderWrapper::getBucketInfo(const spi::Bucket& bucket) const return _spi.getBucketInfo(bucket); } -spi::Result -PersistenceProviderWrapper::put(const spi::Bucket& bucket, spi::Timestamp timestamp, - document::Document::SP doc, spi::Context& context) +void +PersistenceProviderWrapper::putAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, document::Document::SP doc, + spi::Context& context, spi::OperationComplete::UP onComplete) { LOG_SPI("put(" << bucket << ", " << timestamp << ", " << doc->getId() << ")"); - CHECK_ERROR(spi::Result, FAIL_PUT); - return _spi.put(bucket, timestamp, std::move(doc), context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_PUT, onComplete); + _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(onComplete)); } -spi::RemoveResult -PersistenceProviderWrapper::remove(const spi::Bucket& bucket, - spi::Timestamp timestamp, - const spi::DocumentId& id, - spi::Context& context) +void +PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id, + spi::Context& context, spi::OperationComplete::UP onComplete) { LOG_SPI("remove(" << bucket << ", " << timestamp << ", " << id << ")"); - CHECK_ERROR(spi::RemoveResult, FAIL_REMOVE); - return _spi.remove(bucket, timestamp, id, context); + CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE, onComplete); + _spi.removeAsync(bucket, timestamp, id, context, std::move(onComplete)); } -spi::RemoveResult -PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket, - spi::Timestamp timestamp, - const spi::DocumentId& id, - spi::Context& context) +void +PersistenceProviderWrapper::removeIfFoundAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id, + spi::Context& context, spi::OperationComplete::UP onComplete) { LOG_SPI("removeIfFound(" << bucket << ", " << timestamp << ", " << id << ")"); - CHECK_ERROR(spi::RemoveResult, FAIL_REMOVE_IF_FOUND); - return _spi.removeIfFound(bucket, timestamp, id, context); + CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE_IF_FOUND, onComplete); + _spi.removeIfFoundAsync(bucket, timestamp, id, context, std::move(onComplete)); } -spi::UpdateResult -PersistenceProviderWrapper::update(const spi::Bucket& bucket, - spi::Timestamp timestamp, - document::DocumentUpdate::SP upd, - spi::Context& context) +void +PersistenceProviderWrapper::updateAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, document::DocumentUpdate::SP upd, + spi::Context& context, spi::OperationComplete::UP onComplete) { LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")"); - CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE); - return _spi.update(bucket, timestamp, std::move(upd), context); + CHECK_ERROR_ASYNC(spi::UpdateResult, FAIL_UPDATE, onComplete); + _spi.updateAsync(bucket, timestamp, std::move(upd), context, std::move(onComplete)); } spi::GetResult @@ -172,13 +175,13 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId, return _spi.destroyIterator(iterId, context); } -spi::Result -PersistenceProviderWrapper::deleteBucket(const spi::Bucket& bucket, - spi::Context& context) +void +PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, + spi::OperationComplete::UP operationComplete) { LOG_SPI("deleteBucket(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_DELETE_BUCKET); - return _spi.deleteBucket(bucket, context); + CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete); + _spi.deleteBucketAsync(bucket, context, std::move(operationComplete)); } spi::Result @@ -225,4 +228,26 @@ PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket, return _spi.removeEntry(bucket, timestamp, context); } +spi::Result +PersistenceProviderWrapper::initialize() { + return _spi.initialize(); +} + +spi::BucketIdListResult +PersistenceProviderWrapper::getModifiedBuckets(spi::PersistenceProvider::BucketSpace bucketSpace) const { + return _spi.getModifiedBuckets(bucketSpace); +} + +spi::Result +PersistenceProviderWrapper::setClusterState(spi::PersistenceProvider::BucketSpace bucketSpace, const spi::ClusterState &state) { + return _spi.setClusterState(bucketSpace, state); +} + +void +PersistenceProviderWrapper::setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state, + spi::OperationComplete::UP onComplete) +{ + _spi.setActiveStateAsync(bucket, state, std::move(onComplete)); +} + } diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index d90fa7b2eaa..c6628814dba 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -21,7 +21,7 @@ namespace storage { -class PersistenceProviderWrapper : public spi::AbstractPersistenceProvider +class PersistenceProviderWrapper : public spi::PersistenceProvider { public: enum OPERATION_FAILURE_FLAGS @@ -47,11 +47,11 @@ public: // TODO: add more as needed }; private: - spi::PersistenceProvider& _spi; - spi::Result _result; - mutable std::mutex _lock; + spi::PersistenceProvider& _spi; + spi::Result _result; + mutable std::mutex _lock; mutable std::vector<std::string> _log; - uint32_t _failureMask; + uint32_t _failureMask; using Guard = std::lock_guard<std::mutex>; public: PersistenceProviderWrapper(spi::PersistenceProvider& spi); @@ -88,13 +88,21 @@ public: _log.clear(); } + spi::Result initialize() override; + spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; + + spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState &state) override; + + void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state, + spi::OperationComplete::UP up) override; + spi::Result createBucket(const spi::Bucket&, spi::Context&) override; spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; - spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; - spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; - spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; - spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override; + void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; + void removeAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&, spi::OperationComplete::UP) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult @@ -103,7 +111,7 @@ public: spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; spi::Result destroyIterator(spi::IteratorId, spi::Context&) override; - spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override; + void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override; spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override; spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index c51357cacd1..07d2b24d536 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -46,14 +46,15 @@ public: _deleteBucketInvocations(0) {} - spi::Result put(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&) override + void + putAsync(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&, spi::OperationComplete::UP onComplete) override { _queueBarrier.await(); // message abort stage with active opertion in disk queue std::this_thread::sleep_for(75ms); _completionBarrier.await(); // test finished - return spi::Result(); + onComplete->onComplete(std::make_unique<spi::Result>()); } spi::BucketInfoResult getBucketInfo(const spi::Bucket& bucket) const override { @@ -66,9 +67,10 @@ public: return PersistenceProviderWrapper::createBucket(bucket, ctx); } - spi::Result deleteBucket(const spi::Bucket& bucket, spi::Context& ctx) override { + void + deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override { ++_deleteBucketInvocations; - return PersistenceProviderWrapper::deleteBucket(bucket, ctx); + PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete)); } }; diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index fa989410137..60030004594 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -304,10 +304,10 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_FALSE(replySent.get()); LOG(debug, "Verifying that replying the diff sends on back"); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd2); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd2); MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*reply, stub); + handler.handleApplyBucketDiffReply(*reply, stub, createTracker(reply, _bucket)); ASSERT_EQ(1, stub.replies.size()); replySent = stub.replies[0]; } @@ -353,12 +353,12 @@ TEST_F(MergeHandlerTest, master_message_flow) { ASSERT_EQ(2, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType()); auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); - auto reply2 = std::make_unique<api::ApplyBucketDiffReply>(cmd3); + auto reply2 = std::make_shared<api::ApplyBucketDiffReply>(cmd3); ASSERT_EQ(1, reply2->getDiff().size()); reply2->getDiff()[0]._entry._hasMask |= 2u; MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*reply2, stub); + handler.handleApplyBucketDiffReply(*reply2, stub, createTracker(reply2, _bucket)); ASSERT_EQ(1, stub.replies.size()); @@ -470,9 +470,9 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { } } - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); { - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper()); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper(), createTracker(applyBucketDiffReply, _bucket)); if (!messageKeeper()._msgs.empty()) { ASSERT_FALSE(reply.get()); @@ -672,10 +672,10 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) { handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); MessageSenderStub stub; - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket)); ASSERT_EQ(1, stub.replies.size()); @@ -699,14 +699,14 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>(); - auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); + auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd); ASSERT_FALSE(applyBucketDiffReply->getDiff().empty()); // Change a hasMask to indicate something changed during merging. applyBucketDiffReply->getDiff()[0]._entry._hasMask = 0x5; MessageSenderStub stub; LOG(debug, "sending apply bucket diff reply"); - handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub); + handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket)); ASSERT_EQ(1, stub.commands.size()); @@ -1012,10 +1012,10 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( spi::Context&) { (void) test; - api::ApplyBucketDiffReply reply(*_applyCmd); - test.fillDummyApplyDiff(reply.getDiff()); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(*_applyCmd); + test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); - handler.handleApplyBucketDiffReply(reply, _stub); + handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); } std::string @@ -1275,8 +1275,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) EXPECT_EQ(1, cmd2.getAddress()->getIndex()); EXPECT_EQ(1234, cmd2.getSourceIndex()); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList); baseline_diff_size = cmd2.getDiff().size(); auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2); auto &diff = reply->getDiff(); @@ -1292,16 +1292,16 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking first ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // Node 4 has been eliminated before the first ApplyBucketDiff command - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 2u, s.diff.size()); - EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]); - EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 2u, s->diff.size()); + EXPECT_EQ(EntryCheck(20000, 24u), s->diff[baseline_diff_size]); + EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size + 1]); auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]); // ApplyBucketDiffCommand has a shorter node list, node 2 is not present EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd3); auto& diff = reply->getDiff(); EXPECT_EQ(2u, diff.size()); EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry); @@ -1312,7 +1312,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) */ fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo()); diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1. - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled first ApplyBucketDiffReply"); } ASSERT_EQ(3u, messageKeeper()._msgs.size()); @@ -1320,19 +1320,19 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking second ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size]); auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4); auto& diff = reply->getDiff(); EXPECT_EQ(1u, diff.size()); EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry); // Simulate that node 3 somehow lost doc2 when trying to fill diff entry. diff[0]._entry._hasMask &= ~4u; - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled second ApplyBucketDiffReply"); } ASSERT_EQ(4u, messageKeeper()._msgs.size()); @@ -1340,21 +1340,21 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking third ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command - EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList); - EXPECT_EQ(baseline_diff_size + 1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]); + EXPECT_EQ((NodeList{{0, false}, {1, false}}), s->nodeList); + EXPECT_EQ(baseline_diff_size + 1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s->diff[baseline_diff_size]); auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]); EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5); auto& diff = reply->getDiff(); EXPECT_EQ(baseline_diff_size, diff.size()); for (auto& e : diff) { EXPECT_EQ(1u, e._entry._hasMask); e._entry._hasMask |= 2u; } - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled third ApplyBucketDiffReply"); } ASSERT_EQ(5u, messageKeeper()._msgs.size()); @@ -1362,19 +1362,19 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { LOG(debug, "checking fourth ApplyBucketDiff command"); EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket)); - auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); // All nodes in use again due to failure to fill diff entry for doc2 - EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList); - EXPECT_EQ(1u, s.diff.size()); - EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]); + EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList); + EXPECT_EQ(1u, s->diff.size()); + EXPECT_EQ(EntryCheck(20100, 16u), s->diff[0]); auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]); EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes()); - auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6); auto& diff = reply->getDiff(); EXPECT_EQ(1u, diff.size()); fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo()); diff[0]._entry._hasMask |= 2u; - handler.handleApplyBucketDiffReply(*reply, messageKeeper()); + handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket)); LOG(debug, "handled fourth ApplyBucketDiffReply"); } ASSERT_EQ(6u, messageKeeper()._msgs.size()); diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 568eee1e92c..5c192942521 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1151,6 +1151,10 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); ASSERT_LT(maxPending, 100); + + EXPECT_EQ(_throttlers[0]->getMetrics().active_window_size.getLast(), 0); + EXPECT_EQ(_throttlers[0]->getMetrics().queueSize.getLast(), 0); + for (size_t i = 0; i < maxPending + maxQueue; ++i) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); // No chain set, i.e. merge command is freshly squeezed from a distributor. @@ -1162,6 +1166,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist // Wait till we have maxPending replies and maxQueue queued _topLinks[0]->waitForMessages(maxPending, _messageWaitTime); waitUntilMergeQueueIs(*_throttlers[0], maxQueue, _messageWaitTime); + EXPECT_EQ(_throttlers[0]->getMetrics().active_window_size.getLast(), maxPending); EXPECT_EQ(maxQueue, _throttlers[0]->getMetrics().queueSize.getMaximum()); // Clear all forwarded merges |