diff options
author | Geir Storli <geirst@yahooinc.com> | 2021-10-22 14:09:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-22 14:09:48 +0200 |
commit | 4eadb28ec148e758562c6627f730f3e4022bb097 (patch) | |
tree | 4d1fa06e02a7e0411049633227fb1136dcd9797d /storage/src | |
parent | 2e24dcc2305544da1a3eb281a302ac15bddec9f2 (diff) | |
parent | 2cb33e26a4950b302fc67c148ebf76db168eeee6 (diff) |
Merge pull request #19671 from vespa-engine/toregge/delay-replies-for-async-apply-bucket-diff
Delay replies for async apply bucket diff.
Diffstat (limited to 'storage/src')
8 files changed, 227 insertions, 42 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index fe77477fa77..d51485df38d 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -11,6 +11,8 @@ using document::DocumentId; using document::test::makeDocumentBucket; +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; namespace storage { @@ -72,6 +74,7 @@ class ApplyBucketDiffStateTestBase : public ::testing::Test public: uint32_t sync_count; DummyMergeBucketInfoSyncer syncer; + MonitoredRefCount monitored_ref_count; ApplyBucketDiffStateTestBase() : ::testing::Test(), @@ -83,7 +86,7 @@ public: ~ApplyBucketDiffStateTestBase(); std::unique_ptr<ApplyBucketDiffState> make_state() { - return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket)); + return std::make_unique<ApplyBucketDiffState>(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); } }; diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 60030004594..017b8ce2b92 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -20,7 +20,12 @@ using namespace ::testing; namespace storage { -struct MergeHandlerTest : SingleDiskPersistenceTestUtils { +/* + * Class for testing merge handler taking async_apply_bucket_diff as + * parameter for the test. + */ +struct MergeHandlerTest : SingleDiskPersistenceTestUtils, + public testing::WithParamInterface<bool> { uint32_t _location; // Location used for all merge tests document::Bucket _bucket; // Bucket used for all merge tests uint64_t _maxTimestamp; @@ -149,8 +154,11 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { int _counter; MessageSenderStub _stub; std::shared_ptr<api::ApplyBucketDiffCommand> _applyCmd; + void convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler& handler); }; + void convert_delayed_error_to_exception(MergeHandler& handler); + std::string doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, @@ -159,11 +167,21 @@ struct MergeHandlerTest : SingleDiskPersistenceTestUtils { MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), maxChunkSize, 64, GetParam()); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock()); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), 4190208, 64, GetParam()); + } + + std::shared_ptr<api::StorageMessage> get_queued_reply() { + std::shared_ptr<api::StorageMessage> msg; + if (_replySender.queue.getNext(msg, 0s)) { + return msg; + } else { + return {}; + } + } }; @@ -209,7 +227,7 @@ MergeHandlerTest::setUpChain(ChainPos pos) { // Test a regular merge bucket command fetching data, including // puts, removes, unrevertable removes & duplicates. -TEST_F(MergeHandlerTest, merge_bucket_command) { +TEST_P(MergeHandlerTest, merge_bucket_command) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -270,11 +288,11 @@ MergeHandlerTest::testGetBucketDiffChain(bool midChain) EXPECT_EQ(17, diff.size()); } -TEST_F(MergeHandlerTest, get_bucket_diff_mid_chain) { +TEST_P(MergeHandlerTest, get_bucket_diff_mid_chain) { testGetBucketDiffChain(true); } -TEST_F(MergeHandlerTest, get_bucket_diff_end_of_chain) { +TEST_P(MergeHandlerTest, get_bucket_diff_end_of_chain) { testGetBucketDiffChain(false); } @@ -320,17 +338,17 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain) EXPECT_EQ(0, diff.size()); } -TEST_F(MergeHandlerTest, apply_bucket_diff_mid_chain) { +TEST_P(MergeHandlerTest, apply_bucket_diff_mid_chain) { testApplyBucketDiffChain(true); } -TEST_F(MergeHandlerTest, apply_bucket_diff_end_of_chain) { +TEST_P(MergeHandlerTest, apply_bucket_diff_end_of_chain) { testApplyBucketDiffChain(false); } // Test that a simplistic merge with one thing to actually merge, // sends correct commands and finish. -TEST_F(MergeHandlerTest, master_message_flow) { +TEST_P(MergeHandlerTest, master_message_flow) { MergeHandler handler = createHandler(); LOG(debug, "Handle a merge bucket command"); @@ -424,7 +442,7 @@ getFilledDataSize(const std::vector<api::ApplyBucketDiffCommand::Entry>& diff) } -TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { +TEST_P(MergeHandlerTest, chunked_apply_bucket_diff) { uint32_t docSize = 1024; uint32_t docCount = 10; uint32_t maxChunkSize = docSize * 3; @@ -488,7 +506,7 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) { EXPECT_TRUE(reply->getResult().success()); } -TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { +TEST_P(MergeHandlerTest, chunk_limit_partially_filled_diff) { setUpChain(FRONT); uint32_t docSize = 1024; @@ -524,7 +542,7 @@ TEST_F(MergeHandlerTest, chunk_limit_partially_filled_diff) { EXPECT_LE(getFilledDataSize(fwdDiffCmd->getDiff()), maxChunkSize); } -TEST_F(MergeHandlerTest, max_timestamp) { +TEST_P(MergeHandlerTest, max_timestamp) { doPut(1234, spi::Timestamp(_maxTimestamp + 10), 1024, 1024); MergeHandler handler = createHandler(); @@ -632,7 +650,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, uint16_t hasMask return getBucketDiffCmd; } -TEST_F(MergeHandlerTest, spi_flush_guard) { +TEST_P(MergeHandlerTest, spi_flush_guard) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); @@ -647,13 +665,16 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { try { auto cmd = createDummyApplyDiff(6000); handler.handleApplyBucketDiff(*cmd, createTracker(cmd, _bucket)); + if (GetParam()) { + convert_delayed_error_to_exception(handler); + } FAIL() << "No exception thrown on failing in-place remove"; } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); } } -TEST_F(MergeHandlerTest, bucket_not_found_in_db) { +TEST_P(MergeHandlerTest, bucket_not_found_in_db) { MergeHandler handler = createHandler(); // Send merge for unknown bucket auto cmd = std::make_shared<api::MergeBucketCommand>(makeDocumentBucket(document::BucketId(16, 6789)), _nodes, _maxTimestamp); @@ -661,7 +682,7 @@ TEST_F(MergeHandlerTest, bucket_not_found_in_db) { EXPECT_TRUE(tracker->getResult().isBucketDisappearance()); } -TEST_F(MergeHandlerTest, merge_progress_safe_guard) { +TEST_P(MergeHandlerTest, merge_progress_safe_guard) { MergeHandler handler = createHandler(); auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -684,7 +705,7 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) { EXPECT_EQ(mergeReply->getResult().getResult(), api::ReturnCode::INTERNAL_FAILURE); } -TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { +TEST_P(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { MergeHandler handler = createHandler(); _nodes.clear(); _nodes.emplace_back(0, false); @@ -716,7 +737,7 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) { EXPECT_EQ(0x5, applyBucketDiffCmd2->getDiff()[0]._entry._hasMask); } -TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { +TEST_P(MergeHandlerTest, entry_removed_after_get_bucket_diff) { MergeHandler handler = createHandler(); std::vector<api::ApplyBucketDiffCommand::Entry> applyDiff; { @@ -741,6 +762,23 @@ TEST_F(MergeHandlerTest, entry_removed_after_get_bucket_diff) { EXPECT_EQ(0x0, diff[0]._entry._hasMask); } +void +MergeHandlerTest::convert_delayed_error_to_exception(MergeHandler& handler) +{ + handler.drain_async_writes(); + if (getEnv()._fileStorHandler.isMerging(_bucket)) { + auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket); + api::ReturnCode return_code; + s->check_delayed_error(return_code); + if (return_code.failed()) { + getEnv()._fileStorHandler.clearMergeStatus(_bucket, return_code); + fetchSingleMessage<api::ApplyBucketDiffReply>(); + fetchSingleMessage<api::ApplyBucketDiffCommand>(); + throw std::runtime_error(return_code.getMessage()); + } + } +} + std::string MergeHandlerTest::doTestSPIException(MergeHandler& handler, PersistenceProviderWrapper& providerWrapper, @@ -755,6 +793,9 @@ MergeHandlerTest::doTestSPIException(MergeHandler& handler, providerWrapper.setFailureMask(failureMask); try { invoker.invoke(*this, handler, *_context); + if (GetParam()) { + convert_delayed_error_to_exception(handler); + } if (failureMask != 0) { return (std::string("No exception was thrown during handler " "invocation. Expected exception containing '") @@ -823,7 +864,7 @@ MergeHandlerTest::HandleMergeBucketInvoker::invoke( handler.handleMergeBucket(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, merge_bucket_spi_failures) { +TEST_P(MergeHandlerTest, merge_bucket_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -855,7 +896,7 @@ MergeHandlerTest::HandleGetBucketDiffInvoker::invoke( handler.handleGetBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, get_bucket_diff_spi_failures) { +TEST_P(MergeHandlerTest, get_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult(spi::Result(spi::Result::ErrorType::PERMANENT_ERROR, "who you gonna call?")); @@ -888,7 +929,7 @@ MergeHandlerTest::HandleApplyBucketDiffInvoker::invoke( handler.handleApplyBucketDiff(*cmd, test.createTracker(cmd, test._bucket)); } -TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { +TEST_P(MergeHandlerTest, apply_bucket_diff_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -953,7 +994,7 @@ MergeHandlerTest::HandleGetBucketDiffReplyInvoker::afterInvoke( api::ReturnCode::INTERNAL_FAILURE); } -TEST_F(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { +TEST_P(MergeHandlerTest, get_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler = createHandler(providerWrapper); providerWrapper.setResult( @@ -1006,6 +1047,18 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::beforeInvoke( } void +MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::convert_delayed_error_to_exception(MergeHandlerTest& test, MergeHandler &handler) +{ + handler.drain_async_writes(); + if (!_stub.replies.empty() && _stub.replies.back()->getResult().failed()) { + auto chained_reply = _stub.replies.back(); + _stub.replies.pop_back(); + test.messageKeeper().sendReply(chained_reply); + throw std::runtime_error(chained_reply->getResult().getMessage()); + } +} + +void MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( MergeHandlerTest& test, MergeHandler& handler, @@ -1016,6 +1069,9 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke( test.fillDummyApplyDiff(reply->getDiff()); _stub.clear(); handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket)); + if (test.GetParam()) { + convert_delayed_error_to_exception(test, handler); + } } std::string @@ -1039,7 +1095,7 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::afterInvoke( } } -TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { +TEST_P(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); HandleApplyBucketDiffReplyInvoker invoker; for (int i = 0; i < 2; ++i) { @@ -1066,7 +1122,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { } } -TEST_F(MergeHandlerTest, remove_from_diff) { +TEST_P(MergeHandlerTest, remove_from_diff) { framework::defaultimplementation::FakeClock clock; MergeStatus status(clock, 0, 0); @@ -1132,7 +1188,7 @@ TEST_F(MergeHandlerTest, remove_from_diff) { } } -TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { +TEST_P(MergeHandlerTest, remove_put_on_existing_timestamp) { setUpChain(BACK); document::TestDocMan docMan; @@ -1156,8 +1212,15 @@ TEST_F(MergeHandlerTest, remove_put_on_existing_timestamp) { auto tracker = handler.handleApplyBucketDiff(*applyBucketDiffCmd, createTracker(applyBucketDiffCmd, _bucket)); - auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); - ASSERT_TRUE(applyBucketDiffReply.get()); + if (GetParam()) { + ASSERT_FALSE(tracker); + handler.drain_async_writes(); + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(get_queued_reply()); + ASSERT_TRUE(applyBucketDiffReply.get()); + } else { + auto applyBucketDiffReply = std::dynamic_pointer_cast<api::ApplyBucketDiffReply>(std::move(*tracker).stealReplySP()); + ASSERT_TRUE(applyBucketDiffReply.get()); + } auto cmd = std::make_shared<api::MergeBucketCommand>(_bucket, _nodes, _maxTimestamp); handler.handleMergeBucket(*cmd, createTracker(cmd, _bucket)); @@ -1246,7 +1309,7 @@ std::ostream &operator<<(std::ostream &os, const GetBucketDiffCommand::Entry &en } -TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) +TEST_P(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) { using NodeList = decltype(_nodes); // Redundancy is 2 and source only nodes 3 and 4 have doc1 and doc2 @@ -1382,4 +1445,6 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply) LOG(debug, "got mergebucket reply"); } +VESPA_GTEST_INSTANTIATE_TEST_SUITE_P(AsyncApplyBucketDiffParams, MergeHandlerTest, testing::Values(false, true)); + } // storage diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index eb7a5ef5bc6..ad153c41aef 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -2,21 +2,27 @@ #include "apply_bucket_diff_state.h" #include "mergehandler.h" +#include "persistenceutil.h" #include <vespa/document/base/documentid.h> #include <vespa/persistence/spi/result.h> #include <vespa/vespalib/stllike/asciistream.h> using storage::spi::Result; +using vespalib::RetainGuard; namespace storage { -ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket) +ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) : _merge_bucket_info_syncer(merge_bucket_info_syncer), _bucket(bucket), _fail_message(), _failed_flag(), _stale_bucket_info(false), - _promise() + _promise(), + _tracker(), + _delayed_reply(), + _sender(nullptr), + _retain_guard(std::move(retain_guard)) { } @@ -32,6 +38,17 @@ ApplyBucketDiffState::~ApplyBucketDiffState() if (_promise.has_value()) { _promise.value().set_value(_fail_message); } + if (_delayed_reply) { + if (!_delayed_reply->getResult().failed() && !_fail_message.empty()) { + _delayed_reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, _fail_message)); + } + if (_sender) { + _sender->sendReply(std::move(_delayed_reply)); + } else { + // _tracker->_reply and _delayed_reply points to the same reply. + _tracker->sendReply(); + } + } } void @@ -69,4 +86,19 @@ ApplyBucketDiffState::get_future() return _promise.value().get_future(); } +void +ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply) +{ + _tracker = std::move(tracker); + _delayed_reply = std::move(delayed_reply); +} + +void +ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply) +{ + _tracker = std::move(tracker); + _sender = &sender; + _delayed_reply = std::move(delayed_reply); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index af4174b06d6..39f60156e66 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -3,16 +3,20 @@ #pragma once #include <vespa/persistence/spi/bucket.h> +#include <vespa/vespalib/util/retain_guard.h> #include <future> #include <memory> #include <vector> namespace document { class DocumentId; } +namespace storage::api { class StorageReply; } namespace storage::spi { class Result; } namespace storage { class ApplyBucketDiffEntryResult; +class MessageSender; +class MessageTracker; class MergeBucketInfoSyncer; /* @@ -26,9 +30,13 @@ class ApplyBucketDiffState { std::atomic_flag _failed_flag; bool _stale_bucket_info; std::optional<std::promise<vespalib::string>> _promise; + std::unique_ptr<MessageTracker> _tracker; + std::shared_ptr<api::StorageReply> _delayed_reply; + MessageSender* _sender; + vespalib::RetainGuard _retain_guard; public: - ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket); + ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); ~ApplyBucketDiffState(); void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op); void wait(); @@ -36,6 +44,8 @@ public: void mark_stale_bucket_info(); void sync_bucket_info(); std::future<vespalib::string> get_future(); + void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply); + void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply); }; } diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp index febac5b87e5..1d9e0c6fae6 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.cpp @@ -14,6 +14,7 @@ MergeStatus::MergeStatus(const framework::Clock& clock, uint32_t traceLevel) : reply(), full_node_list(), nodeList(), maxTimestamp(0), diff(), pendingId(0), pendingGetDiff(), pendingApplyDiff(), timeout(0), startTime(clock), + delayed_error(), context(priority, traceLevel) {} @@ -122,4 +123,23 @@ MergeStatus::print(std::ostream& out, bool verbose, } } +void +MergeStatus::set_delayed_error(std::future<vespalib::string>&& delayed_error_in) +{ + delayed_error = std::move(delayed_error_in); +} + +void +MergeStatus::check_delayed_error(api::ReturnCode &return_code) +{ + if (!return_code.failed() && delayed_error.has_value()) { + // Wait for pending writes to local node to complete and check error + auto& future_error = delayed_error.value(); + future_error.wait(); + vespalib::string fail_message = future_error.get(); + delayed_error.reset(); + return_code = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, std::move(fail_message)); + } +} + }; diff --git a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h index b28ca4e373a..05ffd1336a2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/mergestatus.h +++ b/storage/src/vespa/storage/persistence/filestorage/mergestatus.h @@ -9,7 +9,9 @@ #include <vector> #include <deque> +#include <future> #include <memory> +#include <optional> namespace storage { @@ -25,6 +27,7 @@ public: std::shared_ptr<api::ApplyBucketDiffReply> pendingApplyDiff; vespalib::duration timeout; framework::MilliSecTimer startTime; + std::optional<std::future<vespalib::string>> delayed_error; spi::Context context; MergeStatus(const framework::Clock&, api::StorageMessage::Priority, uint32_t traceLevel); @@ -40,6 +43,8 @@ public: bool removeFromDiff(const std::vector<api::ApplyBucketDiffCommand::Entry>& part, uint16_t hasMask, const std::vector<api::MergeBucketCommand::Node> &nodes); void print(std::ostream& out, bool verbose, const std::string& indent) const override; bool isFirstNode() const { return static_cast<bool>(reply); } + void set_delayed_error(std::future<vespalib::string>&& delayed_error_in); + void check_delayed_error(api::ReturnCode &return_code); }; } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 17a16487ac4..6d58966d415 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -17,6 +17,9 @@ #include <vespa/log/log.h> LOG_SETUP(".persistence.mergehandler"); +using vespalib::MonitoredRefCount; +using vespalib::RetainGuard; + namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, @@ -28,12 +31,19 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _cluster_context(cluster_context), _env(env), _spi(spi), + _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), _async_apply_bucket_diff(async_apply_bucket_diff) { } +MergeHandler::~MergeHandler() +{ + drain_async_writes(); +} + + namespace { constexpr int getDeleteFlag() { @@ -674,7 +684,8 @@ namespace { api::StorageReply::SP MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, - MessageSender& sender, spi::Context& context) const + MessageSender& sender, spi::Context& context, + std::shared_ptr<ApplyBucketDiffState>& async_results) const { // If last action failed, fail the whole merge if (status.reply->getResult().failed()) { @@ -806,6 +817,10 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, } cmd->setPriority(status.context.getPriority()); cmd->setTimeout(status.timeout); + if (async_results) { + // Check currently pending writes to local node before sending new command. + check_apply_diff_sync(std::move(async_results)); + } if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) { framework::MilliSecTimer startTime(_clock); fetchLocalData(bucket, cmd->getDiff(), 0, context); @@ -1171,7 +1186,8 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe reply.getDiff().begin(), reply.getDiff().end()); - replyToSend = processBucketMerge(bucket, *s, sender, s->context); + std::shared_ptr<ApplyBucketDiffState> async_results; + replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results); if (!replyToSend.get()) { // We have sent something on, and shouldn't reply now. @@ -1211,7 +1227,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra tracker->setMetric(_env._metrics.applyBucketDiff); spi::Bucket bucket(cmd.getBucket()); - auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket); + std::shared_ptr<ApplyBucketDiffState> async_results; LOG(debug, "%s", cmd.toString().c_str()); if (_env._fileStorHandler.isMerging(bucket.getBucket())) { @@ -1233,10 +1249,13 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { framework::MilliSecTimer startTime(_clock); + async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); + if (!_async_apply_bucket_diff) { + check_apply_diff_sync(std::move(async_results)); + } _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); - check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1260,10 +1279,14 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } } - tracker->setReply(std::make_shared<api::ApplyBucketDiffReply>(cmd)); + auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd); + tracker->setReply(reply); static_cast<api::ApplyBucketDiffReply&>(tracker->getReply()).getDiff().swap(cmd.getDiff()); LOG(spam, "Replying to ApplyBucketDiff for %s to node %d.", bucket.toString().c_str(), cmd.getNodes()[index - 1].index); + if (async_results) { + async_results->set_delayed_reply(std::move(tracker), std::move(reply)); + } } else { // When not the last node in merge chain, we must save reply, and // send command on. @@ -1280,6 +1303,10 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra cmd2->setPriority(cmd.getPriority()); cmd2->setTimeout(cmd.getTimeout()); s->pendingId = cmd2->getMsgId(); + if (async_results) { + // Reply handler should check for delayed error. + s->set_delayed_error(async_results->get_future()); + } _env._fileStorHandler.sendCommand(cmd2); // Everything went fine. Don't delete state but wait for reply stateGuard.deactivate(); @@ -1290,12 +1317,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra } void -MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const +MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, MessageSender& sender, MessageTracker::UP tracker) const { - (void) tracker; _env._metrics.applyBucketDiffReply.inc(); spi::Bucket bucket(reply.getBucket()); - auto async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket); + std::shared_ptr<ApplyBucketDiffState> async_results; std::vector<api::ApplyBucketDiffCommand::Entry>& diff(reply.getDiff()); LOG(debug, "%s", reply.toString().c_str()); @@ -1316,6 +1342,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag api::StorageReply::SP replyToSend; // Process apply bucket diff locally api::ReturnCode returnCode = reply.getResult(); + // Check for delayed error from handleApplyBucketDiff + s->check_delayed_error(returnCode); try { if (reply.getResult().failed()) { LOG(debug, "Got failed apply bucket diff reply %s", reply.toString().c_str()); @@ -1329,9 +1357,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } if (applyDiffHasLocallyNeededData(diff, index)) { framework::MilliSecTimer startTime(_clock); + async_results = std::make_shared<ApplyBucketDiffState>(*this, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); + if (!_async_apply_bucket_diff) { + check_apply_diff_sync(std::move(async_results)); + } _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); - check_apply_diff_sync(std::move(async_results)); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), @@ -1370,7 +1401,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag // Should reply now, since we failed. replyToSend = s->reply; } else { - replyToSend = processBucketMerge(bucket, *s, sender, s->context); + replyToSend = processBucketMerge(bucket, *s, sender, s->context, async_results); if (!replyToSend.get()) { // We have sent something on and shouldn't reply now. @@ -1392,6 +1423,10 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag throw; } + if (async_results && replyToSend) { + replyToSend->setResult(returnCode); + async_results->set_delayed_reply(std::move(tracker), sender, std::move(replyToSend)); + } if (clearState) { _env._fileStorHandler.clearMergeStatus(bucket.getBucket()); } @@ -1402,4 +1437,13 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag } } +void +MergeHandler::drain_async_writes() +{ + if (_monitored_ref_count) { + // Wait for related ApplyBucketDiffState objects to be destroyed + _monitored_ref_count->waitForZeroRefCount(); + } +} + } // storage diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index f6e8ddcf306..cc06f769812 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -20,6 +20,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storage/common/cluster_context.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/vespalib/util/monitored_refcount.h> namespace storage { @@ -48,6 +49,8 @@ public: uint32_t commonMergeChainOptimalizationMinimumSize = 64, bool async_apply_bucket_diff = false); + ~MergeHandler(); + bool buildBucketInfoList( const spi::Bucket& bucket, Timestamp maxTimestamp, @@ -70,12 +73,14 @@ public: void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const; MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const; void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; + void drain_async_writes(); private: const framework::Clock &_clock; const ClusterContext &_cluster_context; PersistenceUtil &_env; spi::PersistenceProvider &_spi; + std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; const bool _async_apply_bucket_diff; @@ -84,7 +89,8 @@ private: api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, - spi::Context& context) const; + spi::Context& context, + std::shared_ptr<ApplyBucketDiffState>& async_results) const; /** * Invoke either put, remove or unrevertable remove on the SPI |