// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include #include LOG_SETUP(".test.persistence.handler.merge"); using document::test::makeDocumentBucket; using namespace ::testing; namespace storage { struct MergeHandlerTest : SingleDiskPersistenceTestUtils { uint32_t _location; // Location used for all merge tests document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; std::vector _nodes; std::unique_ptr _context; // Fetch a single command or reply; doesn't care which. template std::shared_ptr fetchSingleMessage(); void SetUp() override; enum ChainPos { FRONT, MIDDLE, BACK }; void setUpChain(ChainPos); void testGetBucketDiffChain(bool midChain); void testApplyBucketDiffChain(bool midChain); // @TODO Add test to test that buildBucketInfo and mergeLists create minimal list (wrong sorting screws this up) void fillDummyApplyDiff(std::vector& diff); std::shared_ptr createDummyApplyDiff( int timestampOffset, uint16_t hasMask = 0x1, bool filled = true); std::shared_ptr createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask); struct ExpectedExceptionSpec // Try saying this out loud 3 times in a row. { uint32_t mask; const char* expected; }; class HandlerInvoker { public: virtual ~HandlerInvoker() = default; virtual void beforeInvoke(MergeHandlerTest&, MergeHandler&, spi::Context&) {} virtual void invoke(MergeHandlerTest&, MergeHandler&, spi::Context&) = 0; virtual std::string afterInvoke(MergeHandlerTest&, MergeHandler&) = 0; }; friend class HandlerInvoker; class NoReplyHandlerInvoker : public HandlerInvoker { public: std::string afterInvoke(MergeHandlerTest&, MergeHandler&) override; }; template std::string checkMessage(api::ReturnCode::Result expectedResult); class HandleMergeBucketInvoker : public NoReplyHandlerInvoker { public: void invoke(MergeHandlerTest&, MergeHandler&, spi::Context&) override; }; class HandleMergeBucketReplyInvoker : public NoReplyHandlerInvoker { public: void invoke(MergeHandlerTest&, MergeHandler&, spi::Context&) override; }; class HandleGetBucketDiffInvoker : public NoReplyHandlerInvoker { public: void invoke(MergeHandlerTest&, MergeHandler&, spi::Context&) override; }; class MultiPositionHandlerInvoker : public HandlerInvoker { public: MultiPositionHandlerInvoker() : _pos(FRONT) { } void setChainPos(ChainPos pos) { _pos = pos; } ChainPos getChainPos() const { return _pos; } private: ChainPos _pos; }; class HandleGetBucketDiffReplyInvoker : public HandlerInvoker { public: HandleGetBucketDiffReplyInvoker(); ~HandleGetBucketDiffReplyInvoker() override; void beforeInvoke(MergeHandlerTest&, MergeHandler&, spi::Context&) override; void invoke(MergeHandlerTest&, MergeHandler&, spi::Context&) override; std::string afterInvoke(MergeHandlerTest&, MergeHandler&) override; private: MessageSenderStub _stub; std::shared_ptr _diffCmd; }; class HandleApplyBucketDiffInvoker : public NoReplyHandlerInvoker { public: HandleApplyBucketDiffInvoker() : _counter(0) {} void invoke(MergeHandlerTest&, MergeHandler&, spi::Context&) override; private: int _counter; }; class HandleApplyBucketDiffReplyInvoker : public MultiPositionHandlerInvoker { public: HandleApplyBucketDiffReplyInvoker(); ~HandleApplyBucketDiffReplyInvoker() override; void beforeInvoke(MergeHandlerTest&, MergeHandler&, spi::Context&) override; void invoke(MergeHandlerTest&, MergeHandler&, spi::Context&) override; std::string afterInvoke(MergeHandlerTest&, MergeHandler&) override; private: int _counter; MessageSenderStub _stub; std::shared_ptr _applyCmd; }; std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, HandlerInvoker& invoker, const ExpectedExceptionSpec& spec); }; MergeHandlerTest::HandleGetBucketDiffReplyInvoker::HandleGetBucketDiffReplyInvoker() = default; MergeHandlerTest::HandleGetBucketDiffReplyInvoker::~HandleGetBucketDiffReplyInvoker() = default; MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::HandleApplyBucketDiffReplyInvoker() : _counter(0), _stub(), _applyCmd() {} MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::~HandleApplyBucketDiffReplyInvoker() = default; void MergeHandlerTest::SetUp() { _context.reset(new spi::Context(documentapi::LoadType::DEFAULT, 0, 0)); SingleDiskPersistenceTestUtils::SetUp(); _location = 1234; _bucket = makeDocumentBucket(document::BucketId(16, _location)); _maxTimestamp = 11501; LOG(debug, "Creating %s in bucket database", _bucket.toString().c_str()); bucketdb::StorageBucketInfo bucketDBEntry; bucketDBEntry.disk = 0; getEnv().getBucketDatabase(_bucket.getBucketSpace()).insert(_bucket.getBucketId(), bucketDBEntry, "mergetestsetup"); LOG(debug, "Creating bucket to merge"); createTestBucket(_bucket); setUpChain(FRONT); } void MergeHandlerTest::setUpChain(ChainPos pos) { _nodes.clear(); if (pos != FRONT) { _nodes.push_back(api::MergeBucketCommand::Node(2, false)); } _nodes.push_back(api::MergeBucketCommand::Node(0, false)); if (pos != BACK) { _nodes.push_back(api::MergeBucketCommand::Node(1, false)); } } // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. TEST_F(MergeHandlerTest, merge_bucket_command) { MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Handle a merge bucket command"); api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); cmd.setSourceIndex(1234); MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context); LOG(debug, "Check state"); ASSERT_EQ(1, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType()); auto& cmd2 = dynamic_cast(*messageKeeper()._msgs[0]); EXPECT_THAT(_nodes, ContainerEq(cmd2.getNodes())); auto diff = cmd2.getDiff(); EXPECT_EQ(17, diff.size()); EXPECT_EQ(1, cmd2.getAddress()->getIndex()); EXPECT_EQ(1234, cmd2.getSourceIndex()); tracker->generateReply(cmd); EXPECT_FALSE(tracker->getReply().get()); } void MergeHandlerTest::testGetBucketDiffChain(bool midChain) { setUpChain(midChain ? MIDDLE : BACK); MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Verifying that get bucket diff is sent on"); api::GetBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); MessageTracker::UP tracker1 = handler.handleGetBucketDiff(cmd, *_context); api::StorageMessage::SP replySent = tracker1->getReply(); if (midChain) { LOG(debug, "Check state"); ASSERT_EQ(1, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType()); auto& cmd2 = dynamic_cast(*messageKeeper()._msgs[0]); EXPECT_THAT(_nodes, ContainerEq(cmd2.getNodes())); auto diff = cmd2.getDiff(); EXPECT_EQ(17, diff.size()); EXPECT_EQ(1, cmd2.getAddress()->getIndex()); LOG(debug, "Verifying that replying the diff sends on back"); auto reply = std::make_unique(cmd2); ASSERT_FALSE(replySent.get()); MessageSenderStub stub; handler.handleGetBucketDiffReply(*reply, stub); ASSERT_EQ(1, stub.replies.size()); replySent = stub.replies[0]; } auto reply2 = std::dynamic_pointer_cast(replySent); ASSERT_TRUE(reply2.get()); EXPECT_THAT(_nodes, ContainerEq(reply2->getNodes())); auto diff = reply2->getDiff(); EXPECT_EQ(17, diff.size()); } TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) { testGetBucketDiffChain(true); } TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) { testGetBucketDiffChain(false); } // Test that a simplistic merge with 1 doc to actually merge, // sends apply bucket diff through the entire chain of 3 nodes. void MergeHandlerTest::testApplyBucketDiffChain(bool midChain) { setUpChain(midChain ? MIDDLE : BACK); MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Verifying that apply bucket diff is sent on"); api::ApplyBucketDiffCommand cmd(_bucket, _nodes, _maxTimestamp); MessageTracker::UP tracker1 = handler.handleApplyBucketDiff(cmd, *_context); api::StorageMessage::SP replySent = tracker1->getReply(); if (midChain) { LOG(debug, "Check state"); ASSERT_EQ(1, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[0]->getType()); auto& cmd2 = dynamic_cast(*messageKeeper()._msgs[0]); EXPECT_THAT(_nodes, ContainerEq(cmd2.getNodes())); auto diff = cmd2.getDiff(); EXPECT_EQ(0, diff.size()); EXPECT_EQ(1, cmd2.getAddress()->getIndex()); EXPECT_FALSE(replySent.get()); LOG(debug, "Verifying that replying the diff sends on back"); auto reply = std::make_unique(cmd2); MessageSenderStub stub; handler.handleApplyBucketDiffReply(*reply, stub); ASSERT_EQ(1, stub.replies.size()); replySent = stub.replies[0]; } auto reply2 = std::dynamic_pointer_cast(replySent); ASSERT_TRUE(reply2.get()); EXPECT_THAT(_nodes, ContainerEq(reply2->getNodes())); auto diff = reply2->getDiff(); EXPECT_EQ(0, diff.size()); } TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) { testApplyBucketDiffChain(true); } TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { testApplyBucketDiffChain(false); } // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. TEST_F(MergeHandlerTest, master_message_flow) { MergeHandler handler(getPersistenceProvider(), getEnv()); LOG(debug, "Handle a merge bucket command"); api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); LOG(debug, "Check state"); ASSERT_EQ(1, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::GETBUCKETDIFF, messageKeeper()._msgs[0]->getType()); auto& cmd2 = dynamic_cast(*messageKeeper()._msgs[0]); auto reply = std::make_unique(cmd2); // End of chain can remove entries all have. This should end up with // one entry master node has other node don't have reply->getDiff().resize(1); handler.handleGetBucketDiffReply(*reply, messageKeeper()); LOG(debug, "Check state"); ASSERT_EQ(2, messageKeeper()._msgs.size()); ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType()); auto& cmd3 = dynamic_cast(*messageKeeper()._msgs[1]); auto reply2 = std::make_unique(cmd3); ASSERT_EQ(1, reply2->getDiff().size()); reply2->getDiff()[0]._entry._hasMask |= 2u; MessageSenderStub stub; handler.handleApplyBucketDiffReply(*reply2, stub); ASSERT_EQ(1, stub.replies.size()); auto reply3 = std::dynamic_pointer_cast(stub.replies[0]); ASSERT_TRUE(reply3.get()); EXPECT_THAT(_nodes, ContainerEq(reply3->getNodes())); EXPECT_TRUE(reply3->getResult().success()); EXPECT_FALSE(fsHandler().isMerging(_bucket)); } template std::shared_ptr MergeHandlerTest::fetchSingleMessage() { std::vector& msgs(messageKeeper()._msgs); if (msgs.empty()) { std::ostringstream oss; oss << "No messages available to fetch (expected type " << typeid(T).name() << ")"; throw std::runtime_error(oss.str()); } std::shared_ptr ret(std::dynamic_pointer_cast( messageKeeper()._msgs.back())); if (!ret) { std::ostringstream oss; oss << "Expected message of type " << typeid(T).name() << ", but got " << messageKeeper()._msgs[0]->toString(); throw std::runtime_error(oss.str()); } messageKeeper()._msgs.pop_back(); return ret; } namespace { size_t getFilledCount(const std::vector& diff) { size_t filledCount = 0; for (size_t i=0; i& diff) { size_t filledSize = 0; for (size_t i=0; i(); auto getBucketDiffReply = std::make_unique(*getBucketDiffCmd); handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); uint32_t totalDiffs = getBucketDiffCmd->getDiff().size(); std::set seen; api::MergeBucketReply::SP reply; while (seen.size() != totalDiffs) { auto applyBucketDiffCmd = fetchSingleMessage(); LOG(debug, "Test that we get chunked diffs in ApplyBucketDiff"); auto& diff = applyBucketDiffCmd->getDiff(); ASSERT_LT(getFilledCount(diff), totalDiffs); ASSERT_LE(getFilledDataSize(diff), maxChunkSize); // Include node 1 in hasmask for all diffs to indicate it's done // Also remember the diffs we've seen thus far to ensure chunking // does not send duplicates. for (size_t i = 0; i < diff.size(); ++i) { if (!diff[i].filled()) { continue; } diff[i]._entry._hasMask |= 2u; auto inserted = seen.emplace(spi::Timestamp(diff[i]._entry._timestamp)); if (!inserted.second) { FAIL() << "Diff for " << diff[i] << " has already been seen in another ApplyBucketDiff"; } } auto applyBucketDiffReply = std::make_unique(*applyBucketDiffCmd); { handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper()); if (!messageKeeper()._msgs.empty()) { ASSERT_FALSE(reply.get()); reply = std::dynamic_pointer_cast( messageKeeper()._msgs[messageKeeper()._msgs.size() - 1]); } } } LOG(debug, "Done with applying diff"); ASSERT_TRUE(reply.get()); EXPECT_THAT(_nodes, ContainerEq(reply->getNodes())); EXPECT_TRUE(reply->getResult().success()); } TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { setUpChain(FRONT); uint32_t docSize = 1024; uint32_t docCount = 3; uint32_t maxChunkSize = 1024 + 1024 + 512; for (uint32_t i = 0; i < docCount; ++i) { doPut(1234, spi::Timestamp(4000 + i), docSize, docSize); } std::vector applyDiff; for (uint32_t i = 0; i < docCount; ++i) { api::ApplyBucketDiffCommand::Entry e; e._entry._timestamp = 4000 + i; if (i == 0) { e._headerBlob.resize(docSize); } e._entry._hasMask = 0x3; e._entry._flags = MergeHandler::IN_USE; applyDiff.push_back(e); } setUpChain(MIDDLE); auto applyBucketDiffCmd = std::make_shared(_bucket, _nodes, maxChunkSize); applyBucketDiffCmd->getDiff() = applyDiff; MergeHandler handler( getPersistenceProvider(), getEnv(), maxChunkSize); handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); auto fwdDiffCmd = fetchSingleMessage(); // Should not fill up more than chunk size allows for EXPECT_EQ(2, getFilledCount(fwdDiffCmd->getDiff())); EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize); } TEST_F(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); MergeHandler handler(getPersistenceProvider(), getEnv()); api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); auto getCmd = fetchSingleMessage(); ASSERT_FALSE(getCmd->getDiff().empty()); EXPECT_LE(getCmd->getDiff().back()._timestamp, _maxTimestamp); } void MergeHandlerTest::fillDummyApplyDiff( std::vector& diff) { document::TestDocMan docMan; document::Document::SP doc( docMan.createRandomDocumentAtLocation(_location)); std::vector headerBlob; { vespalib::nbostream stream; doc->serializeHeader(stream); headerBlob.resize(stream.size()); memcpy(&headerBlob[0], stream.peek(), stream.size()); } assert(diff.size() == 3); diff[0]._headerBlob = headerBlob; diff[1]._docName = doc->getId().toString(); diff[2]._docName = doc->getId().toString(); } std::shared_ptr MergeHandlerTest::createDummyApplyDiff(int timestampOffset, uint16_t hasMask, bool filled) { std::vector applyDiff; { api::ApplyBucketDiffCommand::Entry e; e._entry._timestamp = timestampOffset; e._entry._hasMask = hasMask; e._entry._flags = MergeHandler::IN_USE; applyDiff.push_back(e); } { api::ApplyBucketDiffCommand::Entry e; e._entry._timestamp = timestampOffset + 1; e._entry._hasMask = hasMask; e._entry._flags = MergeHandler::IN_USE | MergeHandler::DELETED; applyDiff.push_back(e); } { api::ApplyBucketDiffCommand::Entry e; e._entry._timestamp = timestampOffset + 2; e._entry._hasMask = hasMask; e._entry._flags = MergeHandler::IN_USE | MergeHandler::DELETED | MergeHandler::DELETED_IN_PLACE; applyDiff.push_back(e); } if (filled) { fillDummyApplyDiff(applyDiff); } auto applyBucketDiffCmd = std::make_shared(_bucket, _nodes, 1024*1024); applyBucketDiffCmd->getDiff() = applyDiff; return applyBucketDiffCmd; } // Must match up with diff used in createDummyApplyDiff std::shared_ptr MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask) { std::vector diff; { api::GetBucketDiffCommand::Entry e; e._timestamp = timestampOffset; e._hasMask = hasMask; e._flags = MergeHandler::IN_USE; diff.push_back(e); } { api::GetBucketDiffCommand::Entry e; e._timestamp = timestampOffset + 1; e._hasMask = hasMask; e._flags = MergeHandler::IN_USE | MergeHandler::DELETED; diff.push_back(e); } { api::GetBucketDiffCommand::Entry e; e._timestamp = timestampOffset + 2; e._hasMask = hasMask; e._flags = MergeHandler::IN_USE | MergeHandler::DELETED | MergeHandler::DELETED_IN_PLACE; diff.push_back(e); } auto getBucketDiffCmd = std::make_shared(_bucket, _nodes, 1024*1024); getBucketDiffCmd->getDiff() = diff; return getBucketDiffCmd; } TEST_F(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper( getPersistenceProvider()); MergeHandler handler(providerWrapper, getEnv()); providerWrapper.setResult( spi::Result(spi::Result::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); // Fail applying unrevertable remove providerWrapper.setFailureMask( PersistenceProviderWrapper::FAIL_REMOVE); providerWrapper.clearOperationLog(); try { handler.handleApplyBucketDiff(*createDummyApplyDiff(6000), *_context); FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); } // Test that we always flush after applying diff locally, even when // errors are encountered. const std::vector& opLog(providerWrapper.getOperationLog()); ASSERT_FALSE(opLog.empty()); EXPECT_EQ("flush(Bucket(0x40000000000004d2, partition 0))", opLog.back()); } TEST_F(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler(getPersistenceProvider(), getEnv()); // Send merge for unknown bucket api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); MessageTracker::UP tracker = handler.handleMergeBucket(cmd, *_context); EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } TEST_F(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler(getPersistenceProvider(), getEnv()); api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); auto getBucketDiffCmd = fetchSingleMessage(); auto getBucketDiffReply = std::make_unique(*getBucketDiffCmd); handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); auto applyBucketDiffCmd = fetchSingleMessage(); auto applyBucketDiffReply = std::make_unique(*applyBucketDiffCmd); MessageSenderStub stub; handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub); ASSERT_EQ(1, stub.replies.size()); auto mergeReply = std::dynamic_pointer_cast(stub.replies[0]); ASSERT_TRUE(mergeReply.get()); EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE); } TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { MergeHandler handler(getPersistenceProvider(), getEnv()); _nodes.clear(); _nodes.emplace_back(0, false); _nodes.emplace_back(1, false); _nodes.emplace_back(2, false); api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); auto getBucketDiffCmd = fetchSingleMessage(); auto getBucketDiffReply = std::make_unique(*getBucketDiffCmd); handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper()); auto applyBucketDiffCmd = fetchSingleMessage(); auto applyBucketDiffReply = std::make_unique(*applyBucketDiffCmd); ASSERT_FALSE(applyBucketDiffReply->getDiff().empty()); // Change a hasMask to indicate something changed during merging. applyBucketDiffReply->getDiff()[0]._entry._hasMask = 0x5; MessageSenderStub stub; LOG(debug, "sending apply bucket diff reply"); handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub); ASSERT_EQ(1, stub.commands.size()); auto applyBucketDiffCmd2 = std::dynamic_pointer_cast(stub.commands[0]); ASSERT_TRUE(applyBucketDiffCmd2.get()); ASSERT_EQ(applyBucketDiffCmd->getDiff().size(), applyBucketDiffCmd2->getDiff().size()); EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask); } TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { MergeHandler handler(getPersistenceProvider(), getEnv()); std::vector applyDiff; { api::ApplyBucketDiffCommand::Entry e; e._entry._timestamp = 13001; // Removed in persistence e._entry._hasMask = 0x2; e._entry._flags = MergeHandler::IN_USE; applyDiff.push_back(e); } setUpChain(BACK); auto applyBucketDiffCmd = std::make_shared(_bucket, _nodes, 1024*1024); applyBucketDiffCmd->getDiff() = applyDiff; auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); auto applyBucketDiffReply = std::dynamic_pointer_cast(tracker->getReply()); ASSERT_TRUE(applyBucketDiffReply.get()); auto& diff = applyBucketDiffReply->getDiff(); ASSERT_EQ(1, diff.size()); EXPECT_FALSE(diff[0].filled()); EXPECT_EQ(0x0, diff[0]._entry._hasMask); } std::string MergeHandlerTest::doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, HandlerInvoker& invoker, const ExpectedExceptionSpec& spec) { providerWrapper.setFailureMask(0); invoker.beforeInvoke(*this, handler, *_context); // Do any setup stuff first uint32_t failureMask = spec.mask; const char* expectedSubstring = spec.expected; providerWrapper.setFailureMask(failureMask); try { invoker.invoke(*this, handler, *_context); if (failureMask != 0) { return (std::string("No exception was thrown during handler " "invocation. Expected exception containing '") + expectedSubstring + "'"); } } catch (const std::runtime_error& e) { if (std::string(e.what()).find(expectedSubstring) == std::string::npos) { return (std::string("Expected exception to contain substring '") + expectedSubstring + "', but message was: " + e.what()); } } if (fsHandler().isMerging(_bucket)) { return (std::string("After operation with expected exception '") + expectedSubstring + "', merge state was not cleared"); } // Postcondition check. std::string check = invoker.afterInvoke(*this, handler); if (!check.empty()) { return (std::string("Postcondition validation failed for operation " "with expected exception '") + expectedSubstring + "': " + check); } return ""; } std::string MergeHandlerTest::NoReplyHandlerInvoker::afterInvoke( MergeHandlerTest& test, MergeHandler& handler) { (void) handler; if (!test.messageKeeper()._msgs.empty()) { std::ostringstream ss; ss << "Expected 0 explicit replies, got " << test.messageKeeper()._msgs.size(); return ss.str(); } return ""; } template std::string MergeHandlerTest::checkMessage(api::ReturnCode::Result expectedResult) { try { std::shared_ptr msg( fetchSingleMessage()); if (msg->getResult().getResult() != expectedResult) { return "Got unexpected result: " + msg->getResult().toString(); } } catch (std::exception& e) { return e.what(); } return ""; } void MergeHandlerTest::HandleMergeBucketInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, spi::Context& context) { api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); } TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler(providerWrapper, getEnv()); providerWrapper.setResult( spi::Result(spi::Result::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, }; typedef ExpectedExceptionSpec* ExceptionIterator; ExceptionIterator last = exceptions + sizeof(exceptions)/sizeof(exceptions[0]); for (ExceptionIterator it = exceptions; it != last; ++it) { HandleMergeBucketInvoker invoker; EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it)); } } void MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, spi::Context& context) { api::GetBucketDiffCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleGetBucketDiff(cmd, context); } TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler(providerWrapper, getEnv()); providerWrapper.setResult( spi::Result(spi::Result::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { { PersistenceProviderWrapper::FAIL_CREATE_BUCKET, "create bucket" }, { PersistenceProviderWrapper::FAIL_BUCKET_INFO, "get bucket info" }, { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, }; typedef ExpectedExceptionSpec* ExceptionIterator; ExceptionIterator last = exceptions + sizeof(exceptions)/sizeof(exceptions[0]); for (ExceptionIterator it = exceptions; it != last; ++it) { HandleGetBucketDiffInvoker invoker; EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it)); } } void MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, spi::Context& context) { ++_counter; std::shared_ptr cmd( test.createDummyApplyDiff(100000 * _counter)); handler.handleApplyBucketDiff(*cmd, context); } TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler(providerWrapper, getEnv()); providerWrapper.setResult( spi::Result(spi::Result::PERMANENT_ERROR, "who you gonna call?")); setUpChain(MIDDLE); ExpectedExceptionSpec exceptions[] = { { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, { PersistenceProviderWrapper::FAIL_PUT, "Failed put" }, { PersistenceProviderWrapper::FAIL_REMOVE, "Failed remove" }, { PersistenceProviderWrapper::FAIL_FLUSH, "Failed flush" }, }; typedef ExpectedExceptionSpec* ExceptionIterator; ExceptionIterator last = exceptions + sizeof(exceptions)/sizeof(exceptions[0]); for (ExceptionIterator it = exceptions; it != last; ++it) { HandleApplyBucketDiffInvoker invoker; EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it)); // Casual, in-place testing of bug 6752085. // This will fail if we give NaN to the metric in question. EXPECT_TRUE(std::isfinite(getEnv()._metrics .mergeAverageDataReceivedNeeded.getLast())); } } void MergeHandlerTest::HandleGetBucketDiffReplyInvoker::beforeInvoke( MergeHandlerTest& test, MergeHandler& handler, spi::Context& context) { api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); _diffCmd = test.fetchSingleMessage(); } void MergeHandlerTest::HandleGetBucketDiffReplyInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, spi::Context&) { (void) test; api::GetBucketDiffReply reply(*_diffCmd); handler.handleGetBucketDiffReply(reply, _stub); } std::string MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( MergeHandlerTest& test, MergeHandler& handler) { (void) handler; if (!_stub.commands.empty()) { return "Unexpected commands in reply stub"; } if (!_stub.replies.empty()) { return "Unexpected replies in reply stub"; } // Initial merge bucket should have been replied to by clearMergeStatus. return test.checkMessage( api::ReturnCode::INTERNAL_FAILURE); } TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler(providerWrapper, getEnv()); providerWrapper.setResult( spi::Result(spi::Result::PERMANENT_ERROR, "who you gonna call?")); HandleGetBucketDiffReplyInvoker invoker; setUpChain(FRONT); ExpectedExceptionSpec exceptions[] = { { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, }; typedef ExpectedExceptionSpec* ExceptionIterator; ExceptionIterator last = exceptions + sizeof(exceptions)/sizeof(exceptions[0]); for (ExceptionIterator it = exceptions; it != last; ++it) { EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it)); } } void MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( MergeHandlerTest& test, MergeHandler& handler, spi::Context& context) { ++_counter; _stub.clear(); if (getChainPos() == FRONT) { api::MergeBucketCommand cmd(test._bucket, test._nodes, test._maxTimestamp); handler.handleMergeBucket(cmd, context); auto diffCmd = test.fetchSingleMessage(); auto dummyDiff = test.createDummyGetBucketDiff(100000 * _counter, 0x4); diffCmd->getDiff() = dummyDiff->getDiff(); api::GetBucketDiffReply diffReply(*diffCmd); handler.handleGetBucketDiffReply(diffReply, _stub); assert(_stub.commands.size() == 1); _applyCmd = std::dynamic_pointer_cast( _stub.commands[0]); } else { // Pretend last node in chain has data and that it will be fetched when // chain is unwinded. auto cmd = test.createDummyApplyDiff(100000 * _counter, 0x4, false); handler.handleApplyBucketDiff(*cmd, context); _applyCmd = test.fetchSingleMessage(); } } void MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, spi::Context&) { (void) test; api::ApplyBucketDiffReply reply(*_applyCmd); test.fillDummyApplyDiff(reply.getDiff()); _stub.clear(); handler.handleApplyBucketDiffReply(reply, _stub); } std::string MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke( MergeHandlerTest& test, MergeHandler& handler) { (void) handler; if (!_stub.commands.empty()) { return "Unexpected commands in reply stub"; } if (!_stub.replies.empty()) { return "Unexpected replies in reply stub"; } if (getChainPos() == FRONT) { return test.checkMessage( api::ReturnCode::INTERNAL_FAILURE); } else { return test.checkMessage( api::ReturnCode::INTERNAL_FAILURE); } } TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); HandleApplyBucketDiffReplyInvoker invoker; for (int i = 0; i < 2; ++i) { ChainPos pos(i == 0 ? FRONT : MIDDLE); setUpChain(pos); invoker.setChainPos(pos); MergeHandler handler(providerWrapper, getEnv()); providerWrapper.setResult( spi::Result(spi::Result::PERMANENT_ERROR, "who you gonna call?")); ExpectedExceptionSpec exceptions[] = { { PersistenceProviderWrapper::FAIL_CREATE_ITERATOR, "create iterator" }, { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, { PersistenceProviderWrapper::FAIL_PUT, "Failed put" }, { PersistenceProviderWrapper::FAIL_REMOVE, "Failed remove" }, { PersistenceProviderWrapper::FAIL_FLUSH, "Failed flush" }, }; typedef ExpectedExceptionSpec* ExceptionIterator; ExceptionIterator last = exceptions + sizeof(exceptions)/sizeof(exceptions[0]); for (ExceptionIterator it = exceptions; it != last; ++it) { EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it)); } } } TEST_F(MergeHandlerTest, remove_from_diff) { framework::defaultimplementation::FakeClock clock; MergeStatus status(clock, documentapi::LoadType::DEFAULT, 0, 0); std::vector diff(2); diff[0]._timestamp = 1234; diff[0]._flags = 0x1; diff[0]._hasMask = 0x2; diff[1]._timestamp = 5678; diff[1]._flags = 0x3; diff[1]._hasMask = 0x6; status.diff.insert(status.diff.end(), diff.begin(), diff.end()); { std::vector applyDiff(2); applyDiff[0]._entry._timestamp = 1234; applyDiff[0]._entry._flags = 0x1; applyDiff[0]._entry._hasMask = 0x0; // Removed during merging applyDiff[1]._entry._timestamp = 5678; applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x7; EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7)); EXPECT_TRUE(status.diff.empty()); } status.diff.insert(status.diff.end(), diff.begin(), diff.end()); { std::vector applyDiff(2); applyDiff[0]._entry._timestamp = 1234; applyDiff[0]._entry._flags = 0x1; applyDiff[0]._entry._hasMask = 0x2; applyDiff[1]._entry._timestamp = 5678; applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x6; EXPECT_FALSE(status.removeFromDiff(applyDiff, 0x7)); EXPECT_EQ(2, status.diff.size()); } status.diff.clear(); status.diff.insert(status.diff.end(), diff.begin(), diff.end()); { // Hasmasks have changed but diff still remains the same size. std::vector applyDiff(2); applyDiff[0]._entry._timestamp = 1234; applyDiff[0]._entry._flags = 0x1; applyDiff[0]._entry._hasMask = 0x1; applyDiff[1]._entry._timestamp = 5678; applyDiff[1]._entry._flags = 0x3; applyDiff[1]._entry._hasMask = 0x5; EXPECT_TRUE(status.removeFromDiff(applyDiff, 0x7)); EXPECT_EQ(2, status.diff.size()); } } TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { setUpChain(BACK); document::TestDocMan docMan; document::Document::SP doc( docMan.createRandomDocumentAtLocation(_location)); spi::Timestamp ts(10111); doPut(doc, ts); MergeHandler handler(getPersistenceProvider(), getEnv()); std::vector applyDiff; { api::ApplyBucketDiffCommand::Entry e; e._entry._timestamp = ts; e._entry._hasMask = 0x1; e._docName = doc->getId().toString(); e._entry._flags = MergeHandler::IN_USE | MergeHandler::DELETED; applyDiff.push_back(e); } auto applyBucketDiffCmd = std::make_shared(_bucket, _nodes, 1024*1024); applyBucketDiffCmd->getDiff() = applyDiff; auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, *_context); auto applyBucketDiffReply = std::dynamic_pointer_cast( tracker->getReply()); ASSERT_TRUE(applyBucketDiffReply.get()); api::MergeBucketCommand cmd(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(cmd, *_context); auto getBucketDiffCmd = fetchSingleMessage(); // Timestamp should now be a regular remove bool foundTimestamp = false; for (size_t i = 0; i < getBucketDiffCmd->getDiff().size(); ++i) { const api::GetBucketDiffCommand::Entry& e( getBucketDiffCmd->getDiff()[i]); if (e._timestamp == ts) { EXPECT_EQ( uint16_t(MergeHandler::IN_USE | MergeHandler::DELETED), e._flags); foundTimestamp = true; break; } } EXPECT_TRUE(foundTimestamp); } } // storage